Author: johan
Date: Thu Aug 14 10:58:30 2008
New Revision: 685979
URL: http://svn.apache.org/viewvc?rev=685979&view=rev
Log:
HADOOP-3935. Split out inner classes from DataNode.java. (johan)
Added:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
(contents, props changed)
- copied, changed from r685529,
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java
Removed:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=685979&r1=685978&r2=685979&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Aug 14 10:58:30 2008
@@ -192,6 +192,8 @@
HADOOP-3844. Include message of local exception in RPC client failures.
(Steve Loughran via omalley)
+ HADOOP-3935. Split out inner classes from DataNode.java. (johan)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
Added:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=685979&view=auto
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(added)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Thu Aug 14 10:58:30 2008
@@ -0,0 +1,969 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+/** A class that receives a block and writes to its own disk, meanwhile
+ * may copies it to another site. If a throttler is provided,
+ * streaming throttling is also supported.
+ **/
+class BlockReceiver implements java.io.Closeable, FSConstants {
+ public static final Log LOG = DataNode.LOG;
+
+ private Block block; // the block to receive
+ protected boolean finalized;
+ private DataInputStream in = null; // from where data are read
+ private DataChecksum checksum; // from where chunks of a block can be read
+ private OutputStream out = null; // to block file at local disk
+ private DataOutputStream checksumOut = null; // to crc file at local disk
+ private int bytesPerChecksum;
+ private int checksumSize;
+ private ByteBuffer buf; // contains one full packet.
+ private int bufRead; //amount of valid data in the buf
+ private int maxPacketReadLen;
+ protected long offsetInBlock;
+ protected final String inAddr;
+ private String mirrorAddr;
+ private DataOutputStream mirrorOut;
+ private Daemon responder = null;
+ private BlockTransferThrottler throttler;
+ private FSDataset.BlockWriteStreams streams;
+ private boolean isRecovery = false;
+ private String clientName;
+ DatanodeInfo srcDataNode = null;
+ private Checksum partialCrc = null;
+ private DataNode datanode = null;
+
+ BlockReceiver(Block block, DataInputStream in, String inAddr,
+ boolean isRecovery, String clientName,
+ DatanodeInfo srcDataNode, DataNode datanode) throws
IOException {
+ try{
+ this.block = block;
+ this.in = in;
+ this.inAddr = inAddr;
+ this.isRecovery = isRecovery;
+ this.clientName = clientName;
+ this.offsetInBlock = 0;
+ this.checksum = DataChecksum.newDataChecksum(in);
+ this.bytesPerChecksum = checksum.getBytesPerChecksum();
+ this.checksumSize = checksum.getChecksumSize();
+ this.srcDataNode = srcDataNode;
+ this.datanode = datanode;
+ //
+ // Open local disk out
+ //
+ streams = datanode.data.writeToBlock(block, isRecovery);
+ this.finalized = datanode.data.isValidBlock(block);
+ if (streams != null) {
+ this.out = streams.dataOut;
+ this.checksumOut = new DataOutputStream(new BufferedOutputStream(
+ streams.checksumOut,
+ SMALL_BUFFER_SIZE));
+ // If this block is for appends, then remove it from periodic
+ // validation.
+ if (datanode.blockScanner != null && isRecovery) {
+ datanode.blockScanner.deleteBlock(block);
+ }
+ }
+ } catch(IOException ioe) {
+ IOUtils.closeStream(this);
+ throw ioe;
+ }
+ }
+
+ /**
+ * close files.
+ */
+ public void close() throws IOException {
+
+ IOException ioe = null;
+ // close checksum file
+ try {
+ if (checksumOut != null) {
+ checksumOut.flush();
+ checksumOut.close();
+ checksumOut = null;
+ }
+ } catch(IOException e) {
+ ioe = e;
+ }
+ // close block file
+ try {
+ if (out != null) {
+ out.flush();
+ out.close();
+ out = null;
+ }
+ } catch (IOException e) {
+ ioe = e;
+ }
+ // disk check
+ if(ioe != null) {
+ datanode.checkDiskError(ioe);
+ throw ioe;
+ }
+ }
+
+ /**
+ * Flush block data and metadata files to disk.
+ * @throws IOException
+ */
+ void flush() throws IOException {
+ if (checksumOut != null) {
+ checksumOut.flush();
+ }
+ if (out != null) {
+ out.flush();
+ }
+ }
+
+ /**
+ * While writing to mirrorOut, failure to write to mirror should not
+ * affect this datanode unless a client is writing the block.
+ */
+ private void handleMirrorOutError(IOException ioe) throws IOException {
+ LOG.info(datanode.dnRegistration + ":Exception writing block " +
+ block + " to mirror " + mirrorAddr + "\n" +
+ StringUtils.stringifyException(ioe));
+ mirrorOut = null;
+ //
+ // If stream-copy fails, continue
+ // writing to disk for replication requests. For client
+ // writes, return error so that the client can do error
+ // recovery.
+ //
+ if (clientName.length() > 0) {
+ throw ioe;
+ }
+ }
+
+ /**
+ * Verify multiple CRC chunks.
+ */
+ private void verifyChunks( byte[] dataBuf, int dataOff, int len,
+ byte[] checksumBuf, int checksumOff )
+ throws IOException {
+ while (len > 0) {
+ int chunkLen = Math.min(len, bytesPerChecksum);
+
+ checksum.update(dataBuf, dataOff, chunkLen);
+
+ if (!checksum.compare(checksumBuf, checksumOff)) {
+ if (srcDataNode != null) {
+ try {
+ LOG.info("report corrupt block " + block + " from datanode " +
+ srcDataNode + " to namenode");
+ LocatedBlock lb = new LocatedBlock(block,
+ new DatanodeInfo[] {srcDataNode});
+ datanode.namenode.reportBadBlocks(new LocatedBlock[] {lb});
+ } catch (IOException e) {
+ LOG.warn("Failed to report bad block " + block +
+ " from datanode " + srcDataNode + " to namenode");
+ }
+ }
+ throw new IOException("Unexpected checksum mismatch " +
+ "while writing " + block + " from " + inAddr);
+ }
+
+ checksum.reset();
+ dataOff += chunkLen;
+ checksumOff += checksumSize;
+ len -= chunkLen;
+ }
+ }
+
+ /**
+ * Makes sure buf.position() is zero without modifying buf.remaining().
+ * It moves the data if position needs to be changed.
+ */
+ private void shiftBufData() {
+ if (bufRead != buf.limit()) {
+ throw new IllegalStateException("bufRead should be same as " +
+ "buf.limit()");
+ }
+
+ //shift the remaining data on buf to the front
+ if (buf.position() > 0) {
+ int dataLeft = buf.remaining();
+ if (dataLeft > 0) {
+ byte[] b = buf.array();
+ System.arraycopy(b, buf.position(), b, 0, dataLeft);
+ }
+ buf.position(0);
+ bufRead = dataLeft;
+ buf.limit(bufRead);
+ }
+ }
+
+ /**
+ * reads upto toRead byte to buf at buf.limit() and increments the limit.
+ * throws an IOException if read does not succeed.
+ */
+ private int readToBuf(int toRead) throws IOException {
+ if (toRead < 0) {
+ toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
+ - buf.limit();
+ }
+
+ int nRead = in.read(buf.array(), buf.limit(), toRead);
+
+ if (nRead < 0) {
+ throw new EOFException("while trying to read " + toRead + " bytes");
+ }
+ bufRead = buf.limit() + nRead;
+ buf.limit(bufRead);
+ return nRead;
+ }
+
+
+ /**
+ * Reads (at least) one packet and returns the packet length.
+ * buf.position() points to the start of the packet and
+ * buf.limit() point to the end of the packet. There could
+ * be more data from next packet in buf.<br><br>
+ *
+ * It tries to read a full packet with single read call.
+ * Consecutive packets are usually of the same length.
+ */
+ private int readNextPacket() throws IOException {
+ /* This dances around buf a little bit, mainly to read
+ * full packet with single read and to accept arbitarary size
+ * for next packet at the same time.
+ */
+ if (buf == null) {
+ /* initialize buffer to the best guess size:
+ * 'chunksPerPacket' calculation here should match the same
+ * calculation in DFSClient to make the guess accurate.
+ */
+ int chunkSize = bytesPerChecksum + checksumSize;
+ int chunksPerPacket = (datanode.writePacketSize -
DataNode.PKT_HEADER_LEN -
+ SIZE_OF_INTEGER + chunkSize - 1)/chunkSize;
+ buf = ByteBuffer.allocate(DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
+ Math.max(chunksPerPacket, 1) * chunkSize);
+ buf.limit(0);
+ }
+
+ // See if there is data left in the buffer :
+ if (bufRead > buf.limit()) {
+ buf.limit(bufRead);
+ }
+
+ while (buf.remaining() < SIZE_OF_INTEGER) {
+ if (buf.position() > 0) {
+ shiftBufData();
+ }
+ readToBuf(-1);
+ }
+
+ /* We mostly have the full packet or at least enough for an int
+ */
+ buf.mark();
+ int payloadLen = buf.getInt();
+ buf.reset();
+
+ if (payloadLen == 0) {
+ //end of stream!
+ buf.limit(buf.position() + SIZE_OF_INTEGER);
+ return 0;
+ }
+
+ // check corrupt values for pktLen, 100MB upper limit should be ok?
+ if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
+ throw new IOException("Incorrect value for packet payload : " +
+ payloadLen);
+ }
+
+ int pktSize = payloadLen + DataNode.PKT_HEADER_LEN;
+
+ if (buf.remaining() < pktSize) {
+ //we need to read more data
+ int toRead = pktSize - buf.remaining();
+
+ // first make sure buf has enough space.
+ int spaceLeft = buf.capacity() - buf.limit();
+ if (toRead > spaceLeft && buf.position() > 0) {
+ shiftBufData();
+ spaceLeft = buf.capacity() - buf.limit();
+ }
+ if (toRead > spaceLeft) {
+ byte oldBuf[] = buf.array();
+ int toCopy = buf.limit();
+ buf = ByteBuffer.allocate(toCopy + toRead);
+ System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
+ buf.limit(toCopy);
+ }
+
+ //now read:
+ while (toRead > 0) {
+ toRead -= readToBuf(toRead);
+ }
+ }
+
+ if (buf.remaining() > pktSize) {
+ buf.limit(buf.position() + pktSize);
+ }
+
+ if (pktSize > maxPacketReadLen) {
+ maxPacketReadLen = pktSize;
+ }
+
+ return payloadLen;
+ }
+
+ /**
+ * Receives and processes a packet. It can contain many chunks.
+ * returns size of the packet.
+ */
+ private int receivePacket() throws IOException {
+
+ int payloadLen = readNextPacket();
+
+ if (payloadLen <= 0) {
+ return payloadLen;
+ }
+
+ buf.mark();
+ //read the header
+ buf.getInt(); // packet length
+ offsetInBlock = buf.getLong(); // get offset of packet in block
+ long seqno = buf.getLong(); // get seqno
+ boolean lastPacketInBlock = (buf.get() != 0);
+
+ int endOfHeader = buf.position();
+ buf.reset();
+
+ if (LOG.isDebugEnabled()){
+ LOG.debug("Receiving one packet for block " + block +
+ " of length " + payloadLen +
+ " seqno " + seqno +
+ " offsetInBlock " + offsetInBlock +
+ " lastPacketInBlock " + lastPacketInBlock);
+ }
+
+ setBlockPosition(offsetInBlock);
+
+ //First write the packet to the mirror:
+ if (mirrorOut != null) {
+ try {
+ mirrorOut.write(buf.array(), buf.position(), buf.remaining());
+ mirrorOut.flush();
+ } catch (IOException e) {
+ handleMirrorOutError(e);
+ }
+ }
+
+ buf.position(endOfHeader);
+ int len = buf.getInt();
+
+ if (len < 0) {
+ throw new IOException("Got wrong length during writeBlock(" + block +
+ ") from " + inAddr + " at offset " +
+ offsetInBlock + ": " + len);
+ }
+
+ if (len == 0) {
+ LOG.debug("Receiving empty packet for block " + block);
+ } else {
+ offsetInBlock += len;
+
+ int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
+ checksumSize;
+
+ if ( buf.remaining() != (checksumLen + len)) {
+ throw new IOException("Data remaining in packet does not match " +
+ "sum of checksumLen and dataLen");
+ }
+ int checksumOff = buf.position();
+ int dataOff = checksumOff + checksumLen;
+ byte pktBuf[] = buf.array();
+
+ buf.position(buf.limit()); // move to the end of the data.
+
+ /* skip verifying checksum iff this is not the last one in the
+ * pipeline and clientName is non-null. i.e. Checksum is verified
+ * on all the datanodes when the data is being written by a
+ * datanode rather than a client. Whe client is writing the data,
+ * protocol includes acks and only the last datanode needs to verify
+ * checksum.
+ */
+ if (mirrorOut == null || clientName.length() == 0) {
+ verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+ }
+
+ try {
+ if (!finalized) {
+ //finally write to the disk :
+ out.write(pktBuf, dataOff, len);
+
+ // If this is a partial chunk, then verify that this is the only
+ // chunk in the packet. Calculate new crc for this chunk.
+ if (partialCrc != null) {
+ if (len > bytesPerChecksum) {
+ throw new IOException("Got wrong length during writeBlock(" +
+ block + ") from " + inAddr + " " +
+ "A packet can have only one partial
chunk."+
+ " len = " + len +
+ " bytesPerChecksum " + bytesPerChecksum);
+ }
+ partialCrc.update(pktBuf, dataOff, len);
+ byte[] buf = FSOutputSummer.convertToByteStream(partialCrc,
checksumSize);
+ checksumOut.write(buf);
+ LOG.debug("Writing out partial crc for data len " + len);
+ partialCrc = null;
+ } else {
+ checksumOut.write(pktBuf, checksumOff, checksumLen);
+ }
+ datanode.myMetrics.bytesWritten.inc(len);
+ }
+ } catch (IOException iex) {
+ datanode.checkDiskError(iex);
+ throw iex;
+ }
+ }
+
+ /// flush entire packet before sending ack
+ flush();
+
+ // put in queue for pending acks
+ if (responder != null) {
+ ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+ lastPacketInBlock);
+ }
+
+ if (throttler != null) { // throttle I/O
+ throttler.throttle(payloadLen);
+ }
+
+ return payloadLen;
+ }
+
+ void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
+ checksum.writeHeader(mirrorOut);
+ }
+
+
+ void receiveBlock(
+ DataOutputStream mirrOut, // output to next datanode
+ DataInputStream mirrIn, // input from next datanode
+ DataOutputStream replyOut, // output to previous datanode
+ String mirrAddr, BlockTransferThrottler throttlerArg,
+ int numTargets) throws IOException {
+
+ mirrorOut = mirrOut;
+ mirrorAddr = mirrAddr;
+ throttler = throttlerArg;
+
+ try {
+ // write data chunk header
+ if (!finalized) {
+ BlockMetadataHeader.writeHeader(checksumOut, checksum);
+ }
+ if (clientName.length() > 0) {
+ responder = new Daemon(datanode.threadGroup,
+ new PacketResponder(this, block, mirrIn,
+ replyOut, numTargets,
+ clientName));
+ responder.start(); // start thread to processes reponses
+ }
+
+ /*
+ * Receive until packet length is zero.
+ */
+ while (receivePacket() > 0) {}
+
+ // flush the mirror out
+ if (mirrorOut != null) {
+ try {
+ mirrorOut.writeInt(0); // mark the end of the block
+ mirrorOut.flush();
+ } catch (IOException e) {
+ handleMirrorOutError(e);
+ }
+ }
+
+ // wait for all outstanding packet responses. And then
+ // indicate responder to gracefully shutdown.
+ if (responder != null) {
+ ((PacketResponder)responder.getRunnable()).close();
+ }
+
+ // if this write is for a replication request (and not
+ // from a client), then finalize block. For client-writes,
+ // the block is finalized in the PacketResponder.
+ if (clientName.length() == 0) {
+ // close the block/crc files
+ close();
+
+ // Finalize the block. Does this fsync()?
+ block.setNumBytes(offsetInBlock);
+ datanode.data.finalizeBlock(block);
+ datanode.myMetrics.blocksWritten.inc();
+ }
+
+ } catch (IOException ioe) {
+ LOG.info("Exception in receiveBlock for block " + block +
+ " " + ioe);
+ IOUtils.closeStream(this);
+ if (responder != null) {
+ responder.interrupt();
+ }
+ throw ioe;
+ } finally {
+ if (responder != null) {
+ try {
+ responder.join();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted receiveBlock");
+ }
+ responder = null;
+ }
+ }
+ }
+
+ /**
+ * Sets the file pointer in the local block file to the specified value.
+ */
+ private void setBlockPosition(long offsetInBlock) throws IOException {
+ if (finalized) {
+ if (!isRecovery) {
+ throw new IOException("Write to offset " + offsetInBlock +
+ " of block " + block +
+ " that is already finalized.");
+ }
+ if (offsetInBlock > datanode.data.getLength(block)) {
+ throw new IOException("Write to offset " + offsetInBlock +
+ " of block " + block +
+ " that is already finalized and is of size " +
+ datanode.data.getLength(block));
+ }
+ return;
+ }
+
+ if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
+ return; // nothing to do
+ }
+ long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
+ offsetInBlock / bytesPerChecksum * checksumSize;
+ if (out != null) {
+ out.flush();
+ }
+ if (checksumOut != null) {
+ checksumOut.flush();
+ }
+
+ // If this is a partial chunk, then read in pre-existing checksum
+ if (offsetInBlock % bytesPerChecksum != 0) {
+ LOG.info("setBlockPosition trying to set position to " +
+ offsetInBlock +
+ " for block " + block +
+ " which is not a multiple of bytesPerChecksum " +
+ bytesPerChecksum);
+ computePartialChunkCrc(offsetInBlock, offsetInChecksum,
bytesPerChecksum);
+ }
+
+ LOG.info("Changing block file offset of block " + block + " from " +
+ datanode.data.getChannelPosition(block, streams) +
+ " to " + offsetInBlock +
+ " meta file offset to " + offsetInChecksum);
+
+ // set the position of the block file
+ datanode.data.setChannelPosition(block, streams, offsetInBlock,
offsetInChecksum);
+ }
+
+ /**
+ * reads in the partial crc chunk and computes checksum
+ * of pre-existing data in partial chunk.
+ */
+ private void computePartialChunkCrc(long blkoff, long ckoff,
+ int bytesPerChecksum) throws IOException
{
+
+ // find offset of the beginning of partial chunk.
+ //
+ int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
+ int checksumSize = checksum.getChecksumSize();
+ blkoff = blkoff - sizePartialChunk;
+ LOG.info("computePartialChunkCrc sizePartialChunk " +
+ sizePartialChunk +
+ " block " + block +
+ " offset in block " + blkoff +
+ " offset in metafile " + ckoff);
+
+ // create an input stream from the block file
+ // and read in partial crc chunk into temporary buffer
+ //
+ byte[] buf = new byte[sizePartialChunk];
+ byte[] crcbuf = new byte[checksumSize];
+ FSDataset.BlockInputStreams instr = null;
+ try {
+ instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
+ IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
+
+ // open meta file and read in crc value computer earlier
+ IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
+ } finally {
+ IOUtils.closeStream(instr);
+ }
+
+ // compute crc of partial chunk from data read in the block file.
+ partialCrc = new CRC32();
+ partialCrc.update(buf, 0, sizePartialChunk);
+ LOG.info("Read in partial CRC chunk from disk for block " + block);
+
+ // paranoia! verify that the pre-computed crc matches what we
+ // recalculated just now
+ if (partialCrc.getValue() != FSInputChecker.checksum2long(crcbuf)) {
+ String msg = "Partial CRC " + partialCrc.getValue() +
+ " does not match value computed the " +
+ " last time file was closed " +
+ FSInputChecker.checksum2long(crcbuf);
+ throw new IOException(msg);
+ }
+ //LOG.debug("Partial CRC matches 0x" +
+ // Long.toHexString(partialCrc.getValue()));
+ }
+
+
+ /**
+ * Processed responses from downstream datanodes in the pipeline
+ * and sends back replies to the originator.
+ */
+ class PacketResponder implements Runnable, FSConstants {
+
+ //packet waiting for ack
+ private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
+ private volatile boolean running = true;
+ private Block block;
+ DataInputStream mirrorIn; // input from downstream datanode
+ DataOutputStream replyOut; // output to upstream datanode
+ private int numTargets; // number of downstream datanodes including
myself
+ private String clientName; // The name of the client (if any)
+ private BlockReceiver receiver; // The owner of this responder.
+
+ public String toString() {
+ return "PacketResponder " + numTargets + " for Block " + this.block;
+ }
+
+ PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
+ DataOutputStream out, int numTargets, String clientName) {
+ this.receiver = receiver;
+ this.block = b;
+ mirrorIn = in;
+ replyOut = out;
+ this.numTargets = numTargets;
+ this.clientName = clientName;
+ }
+
+ /**
+ * enqueue the seqno that is still be to acked by the downstream datanode.
+ * @param seqno
+ * @param lastPacketInBlock
+ */
+ synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
+ if (running) {
+ LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
+ " to ack queue.");
+ ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
+ notifyAll();
+ }
+ }
+
+ /**
+ * wait for all pending packets to be acked. Then shutdown thread.
+ */
+ synchronized void close() {
+ while (running && ackQueue.size() != 0 && datanode.shouldRun) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ running = false;
+ }
+ }
+ LOG.debug("PacketResponder " + numTargets +
+ " for block " + block + " Closing down.");
+ running = false;
+ notifyAll();
+ }
+
+ private synchronized void lastDataNodeRun() {
+ long lastHeartbeat = System.currentTimeMillis();
+ boolean lastPacket = false;
+
+ while (running && datanode.shouldRun && !lastPacket) {
+ long now = System.currentTimeMillis();
+ try {
+
+ // wait for a packet to be sent to downstream datanode
+ while (running && datanode.shouldRun && ackQueue.size() == 0) {
+ long idle = now - lastHeartbeat;
+ long timeout = (datanode.socketTimeout/2) - idle;
+ if (timeout <= 0) {
+ timeout = 1000;
+ }
+ try {
+ wait(timeout);
+ } catch (InterruptedException e) {
+ if (running) {
+ LOG.info("PacketResponder " + numTargets +
+ " for block " + block + " Interrupted.");
+ running = false;
+ }
+ break;
+ }
+
+ // send a heartbeat if it is time.
+ now = System.currentTimeMillis();
+ if (now - lastHeartbeat > datanode.socketTimeout/2) {
+ replyOut.writeLong(-1); // send heartbeat
+ replyOut.flush();
+ lastHeartbeat = now;
+ }
+ }
+
+ if (!running || !datanode.shouldRun) {
+ break;
+ }
+ Packet pkt = ackQueue.removeFirst();
+ long expected = pkt.seqno;
+ notifyAll();
+ LOG.debug("PacketResponder " + numTargets +
+ " for block " + block +
+ " acking for packet " + expected);
+
+ // If this is the last packet in block, then close block
+ // file and finalize the block before responding success
+ if (pkt.lastPacketInBlock) {
+ if (!receiver.finalized) {
+ receiver.close();
+ block.setNumBytes(receiver.offsetInBlock);
+ datanode.data.finalizeBlock(block);
+ datanode.myMetrics.blocksWritten.inc();
+ datanode.notifyNamenodeReceivedBlock(block,
+ DataNode.EMPTY_DEL_HINT);
+ LOG.info("Received block " + block +
+ " of size " + block.getNumBytes() +
+ " from " + receiver.inAddr);
+ }
+ lastPacket = true;
+ }
+
+ replyOut.writeLong(expected);
+ replyOut.writeShort(OP_STATUS_SUCCESS);
+ replyOut.flush();
+ } catch (Exception e) {
+ if (running) {
+ LOG.info("PacketResponder " + block + " " + numTargets +
+ " Exception " + StringUtils.stringifyException(e));
+ running = false;
+ }
+ }
+ }
+ LOG.info("PacketResponder " + numTargets +
+ " for block " + block + " terminating");
+ }
+
+ /**
+ * Thread to process incoming acks.
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+
+ // If this is the last datanode in pipeline, then handle differently
+ if (numTargets == 0) {
+ lastDataNodeRun();
+ return;
+ }
+
+ boolean lastPacketInBlock = false;
+ while (running && datanode.shouldRun && !lastPacketInBlock) {
+
+ try {
+ short op = OP_STATUS_SUCCESS;
+ boolean didRead = false;
+ long expected = -2;
+ try {
+ // read seqno from downstream datanode
+ long seqno = mirrorIn.readLong();
+ didRead = true;
+ if (seqno == -1) {
+ replyOut.writeLong(-1); // send keepalive
+ replyOut.flush();
+ LOG.debug("PacketResponder " + numTargets + " got -1");
+ continue;
+ } else if (seqno == -2) {
+ LOG.debug("PacketResponder " + numTargets + " got -2");
+ } else {
+ LOG.debug("PacketResponder " + numTargets + " got seqno = " +
+ seqno);
+ Packet pkt = null;
+ synchronized (this) {
+ while (running && datanode.shouldRun && ackQueue.size() ==
0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets +
+ " seqno = " + seqno +
+ " for block " + block +
+ " waiting for local datanode to finish
write.");
+ }
+ wait();
+ }
+ pkt = ackQueue.removeFirst();
+ expected = pkt.seqno;
+ notifyAll();
+ LOG.debug("PacketResponder " + numTargets + " seqno = " +
seqno);
+ if (seqno != expected) {
+ throw new IOException("PacketResponder " + numTargets +
+ " for block " + block +
+ " expected seqno:" + expected +
+ " received:" + seqno);
+ }
+ lastPacketInBlock = pkt.lastPacketInBlock;
+ }
+ }
+ } catch (Throwable e) {
+ if (running) {
+ LOG.info("PacketResponder " + block + " " + numTargets +
+ " Exception " + StringUtils.stringifyException(e));
+ running = false;
+ }
+ }
+
+ if (Thread.interrupted()) {
+ /* The receiver thread cancelled this thread.
+ * We could also check any other status updates from the
+ * receiver thread (e.g. if it is ok to write to replyOut).
+ * It is prudent to not send any more status back to the client
+ * because this datanode has a problem. The upstream datanode
+ * will detect a timout on heartbeats and will declare that
+ * this datanode is bad, and rightly so.
+ */
+ LOG.info("PacketResponder " + block + " " + numTargets +
+ " : Thread is interrupted.");
+ running = false;
+ continue;
+ }
+
+ if (!didRead) {
+ op = OP_STATUS_ERROR;
+ }
+
+ // If this is the last packet in block, then close block
+ // file and finalize the block before responding success
+ if (lastPacketInBlock && !receiver.finalized) {
+ receiver.close();
+ block.setNumBytes(receiver.offsetInBlock);
+ datanode.data.finalizeBlock(block);
+ datanode.myMetrics.blocksWritten.inc();
+ datanode.notifyNamenodeReceivedBlock(block,
+ DataNode.EMPTY_DEL_HINT);
+ LOG.info("Received block " + block +
+ " of size " + block.getNumBytes() +
+ " from " + receiver.inAddr);
+ }
+
+ // send my status back to upstream datanode
+ replyOut.writeLong(expected); // send seqno upstream
+ replyOut.writeShort(OP_STATUS_SUCCESS);
+
+ LOG.debug("PacketResponder " + numTargets +
+ " for block " + block +
+ " responded my status " +
+ " for seqno " + expected);
+
+ // forward responses from downstream datanodes.
+ for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
+ try {
+ if (op == OP_STATUS_SUCCESS) {
+ op = mirrorIn.readShort();
+ if (op != OP_STATUS_SUCCESS) {
+ LOG.debug("PacketResponder for block " + block +
+ ": error code received from downstream " +
+ " datanode[" + i + "] " + op);
+ }
+ }
+ } catch (Throwable e) {
+ op = OP_STATUS_ERROR;
+ }
+ replyOut.writeShort(op);
+ }
+ replyOut.flush();
+ LOG.debug("PacketResponder " + block + " " + numTargets +
+ " responded other status " + " for seqno " + expected);
+
+ // If we were unable to read the seqno from downstream, then stop.
+ if (expected == -2) {
+ running = false;
+ }
+ // If we forwarded an error response from a downstream datanode
+ // and we are acting on behalf of a client, then we quit. The
+ // client will drive the recovery mechanism.
+ if (op == OP_STATUS_ERROR && clientName.length() > 0) {
+ running = false;
+ }
+ } catch (IOException e) {
+ if (running) {
+ LOG.info("PacketResponder " + block + " " + numTargets +
+ " Exception " + StringUtils.stringifyException(e));
+ running = false;
+ }
+ } catch (RuntimeException e) {
+ if (running) {
+ LOG.info("PacketResponder " + block + " " + numTargets +
+ " Exception " + StringUtils.stringifyException(e));
+ running = false;
+ }
+ }
+ }
+ LOG.info("PacketResponder " + numTargets +
+ " for block " + block + " terminating");
+ }
+ }
+
+ /**
+ * This information is cached by the Datanode in the ackQueue.
+ */
+ static private class Packet {
+ long seqno;
+ boolean lastPacketInBlock;
+
+ Packet(long seqno, boolean lastPacketInBlock) {
+ this.seqno = seqno;
+ this.lastPacketInBlock = lastPacketInBlock;
+ }
+ }
+}
Added:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=685979&view=auto
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
(added)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
Thu Aug 14 10:58:30 2008
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Reads a block from the disk and sends it to a recipient.
+ */
+class BlockSender implements java.io.Closeable, FSConstants {
+ public static final Log LOG = DataNode.LOG;
+
+ private Block block; // the block to read from
+ private InputStream blockIn; // data stream
+ private long blockInPosition = -1; // updated while using transferTo().
+ private DataInputStream checksumIn; // checksum datastream
+ private DataChecksum checksum; // checksum stream
+ private long offset; // starting position to read
+ private long endOffset; // ending position
+ private long blockLength;
+ private int bytesPerChecksum; // chunk size
+ private int checksumSize; // checksum size
+ private boolean corruptChecksumOk; // if need to verify checksum
+ private boolean chunkOffsetOK; // if need to send chunk offset
+ private long seqno; // sequence number of packet
+
+ private boolean transferToAllowed = true;
+ private boolean blockReadFully; //set when the whole block is read
+ private boolean verifyChecksum; //if true, check is verified while reading
+ private BlockTransferThrottler throttler;
+
+ /**
+ * Minimum buffer used while sending data to clients. Used only if
+ * transferTo() is enabled. 64KB is not that large. It could be larger, but
+ * not sure if there will be much more improvement.
+ */
+ private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+
+
+ BlockSender(Block block, long startOffset, long length,
+ boolean corruptChecksumOk, boolean chunkOffsetOK,
+ boolean verifyChecksum, DataNode datanode) throws IOException {
+
+ try {
+ this.block = block;
+ this.chunkOffsetOK = chunkOffsetOK;
+ this.corruptChecksumOk = corruptChecksumOk;
+ this.verifyChecksum = verifyChecksum;
+ this.blockLength = datanode.data.getLength(block);
+ this.transferToAllowed = datanode.transferToAllowed;
+
+ if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
+ checksumIn = new DataInputStream(
+ new
BufferedInputStream(datanode.data.getMetaDataInputStream(block),
+ BUFFER_SIZE));
+
+ // read and handle the common header here. For now just a version
+ BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+ short version = header.getVersion();
+
+ if (version != FSDataset.METADATA_VERSION) {
+ LOG.warn("Wrong version (" + version + ") for metadata file for "
+ + block + " ignoring ...");
+ }
+ checksum = header.getChecksum();
+ } else {
+ LOG.warn("Could not find metadata file for " + block);
+ // This only decides the buffer size. Use BUFFER_SIZE?
+ checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
+ 16 * 1024);
+ }
+
+ /* If bytesPerChecksum is very large, then the metadata file
+ * is mostly corrupted. For now just truncate bytesPerchecksum to
+ * blockLength.
+ */
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+ checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+ Math.max((int)blockLength, 10*1024*1024));
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ }
+ checksumSize = checksum.getChecksumSize();
+
+ if (length < 0) {
+ length = blockLength;
+ }
+
+ endOffset = blockLength;
+ if (startOffset < 0 || startOffset > endOffset
+ || (length + startOffset) > endOffset) {
+ String msg = " Offset " + startOffset + " and length " + length
+ + " don't match block " + block + " ( blockLen " + endOffset + " )";
+ LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg);
+ throw new IOException(msg);
+ }
+
+
+ offset = (startOffset - (startOffset % bytesPerChecksum));
+ if (length >= 0) {
+ // Make sure endOffset points to end of a checksumed chunk.
+ long tmpLen = startOffset + length + (startOffset - offset);
+ if (tmpLen % bytesPerChecksum != 0) {
+ tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+ }
+ if (tmpLen < endOffset) {
+ endOffset = tmpLen;
+ }
+ }
+
+ // seek to the right offsets
+ if (offset > 0) {
+ long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+ // note blockInStream is seeked when created below
+ if (checksumSkip > 0) {
+ // Should we use seek() for checksum file as well?
+ IOUtils.skipFully(checksumIn, checksumSkip);
+ }
+ }
+ seqno = 0;
+
+ blockIn = datanode.data.getBlockInputStream(block, offset); // seek to
offset
+ } catch (IOException ioe) {
+ IOUtils.closeStream(this);
+ IOUtils.closeStream(blockIn);
+ throw ioe;
+ }
+ }
+
+ /**
+ * close opened files.
+ */
+ public void close() throws IOException {
+ IOException ioe = null;
+ // close checksum file
+ if(checksumIn!=null) {
+ try {
+ checksumIn.close();
+ } catch (IOException e) {
+ ioe = e;
+ }
+ checksumIn = null;
+ }
+ // close data file
+ if(blockIn!=null) {
+ try {
+ blockIn.close();
+ } catch (IOException e) {
+ ioe = e;
+ }
+ blockIn = null;
+ }
+ // throw IOException if there is any
+ if(ioe!= null) {
+ throw ioe;
+ }
+ }
+
+ /**
+ * Sends upto maxChunks chunks of data.
+ *
+ * When blockInPosition is >= 0, assumes 'out' is a
+ * [EMAIL PROTECTED] SocketOutputStream} and tries
+ * [EMAIL PROTECTED] SocketOutputStream#transferToFully(FileChannel, long,
int)} to
+ * send data (and updates blockInPosition).
+ */
+ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
+ throws IOException {
+ // Sends multiple chunks in one packet with a single write().
+
+ int len = Math.min((int) (endOffset - offset),
+ bytesPerChecksum*maxChunks);
+ if (len == 0) {
+ return 0;
+ }
+
+ int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
+ int packetLen = len + numChunks*checksumSize + 4;
+ pkt.clear();
+
+ // write packet header
+ pkt.putInt(packetLen);
+ pkt.putLong(offset);
+ pkt.putLong(seqno);
+ pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+ //why no ByteBuf.putBoolean()?
+ pkt.putInt(len);
+
+ int checksumOff = pkt.position();
+ int checksumLen = numChunks * checksumSize;
+ byte[] buf = pkt.array();
+
+ if (checksumSize > 0 && checksumIn != null) {
+ try {
+ checksumIn.readFully(buf, checksumOff, checksumLen);
+ } catch (IOException e) {
+ LOG.warn(" Could not read or failed to veirfy checksum for data" +
+ " at offset " + offset + " for block " + block + " got : "
+ + StringUtils.stringifyException(e));
+ IOUtils.closeStream(checksumIn);
+ checksumIn = null;
+ if (corruptChecksumOk) {
+ // Just fill the array with zeros.
+ Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ int dataOff = checksumOff + checksumLen;
+
+ if (blockInPosition < 0) {
+ //normal transfer
+ IOUtils.readFully(blockIn, buf, dataOff, len);
+
+ if (verifyChecksum) {
+ int dOff = dataOff;
+ int cOff = checksumOff;
+ int dLeft = len;
+
+ for (int i=0; i<numChunks; i++) {
+ checksum.reset();
+ int dLen = Math.min(dLeft, bytesPerChecksum);
+ checksum.update(buf, dOff, dLen);
+ if (!checksum.compare(buf, cOff)) {
+ throw new ChecksumException("Checksum failed at " +
+ (offset + len - dLeft), len);
+ }
+ dLeft -= dLen;
+ dOff += dLen;
+ cOff += checksumSize;
+ }
+ }
+ //writing is done below (mainly to handle IOException)
+ }
+
+ try {
+ if (blockInPosition >= 0) {
+ //use transferTo(). Checks on out and blockIn are already done.
+
+ SocketOutputStream sockOut = (SocketOutputStream)out;
+ //first write the packet
+ sockOut.write(buf, 0, dataOff);
+ // no need to flush. since we know out is not a buffered stream.
+
+ sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
+ blockInPosition, len);
+
+ blockInPosition += len;
+ } else {
+ // normal transfer
+ out.write(buf, 0, dataOff + len);
+ }
+
+ } catch (IOException e) {
+ /* exception while writing to the client (well, with transferTo(),
+ * it could also be while reading from the local file). Many times
+ * this error can be ignored. We will let the callers distinguish this
+ * from other exceptions if this is not a subclass of IOException.
+ */
+ if (e.getClass().equals(IOException.class)) {
+ // "se" could be a new class in stead of SocketException.
+ IOException se = new SocketException("Original Exception : " + e);
+ se.initCause(e);
+ /* Cange the stacktrace so that original trace is not truncated
+ * when printed.*/
+ se.setStackTrace(e.getStackTrace());
+ throw se;
+ }
+ throw e;
+ }
+
+ if (throttler != null) { // rebalancing so throttle
+ throttler.throttle(packetLen);
+ }
+
+ return len;
+ }
+
+ /**
+ * sendBlock() is used to read block and its metadata and stream the data to
+ * either a client or to another datanode.
+ *
+ * @param out stream to which the block is written to
+ * @param baseStream optional. if non-null, <code>out</code> is assumed to
+ * be a wrapper over this stream. This enables optimizations for
+ * sending the data, e.g.
+ * [EMAIL PROTECTED] SocketOutputStream#transferToFully(FileChannel,
+ * long, int)}.
+ * @param throttler for sending data.
+ * @return total bytes reads, including crc.
+ */
+ long sendBlock(DataOutputStream out, OutputStream baseStream,
+ BlockTransferThrottler throttler) throws IOException {
+ if( out == null ) {
+ throw new IOException( "out stream is null" );
+ }
+ this.throttler = throttler;
+
+ long initialOffset = offset;
+ long totalRead = 0;
+ OutputStream streamForSendChunks = out;
+
+ try {
+ checksum.writeHeader(out);
+ if ( chunkOffsetOK ) {
+ out.writeLong( offset );
+ }
+ out.flush();
+
+ int maxChunksPerPacket;
+ int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+
+ if (transferToAllowed && !verifyChecksum &&
+ baseStream instanceof SocketOutputStream &&
+ blockIn instanceof FileInputStream) {
+
+ FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
+
+ // blockInPosition also indicates sendChunks() uses transferTo.
+ blockInPosition = fileChannel.position();
+ streamForSendChunks = baseStream;
+
+ // assure a mininum buffer size.
+ maxChunksPerPacket = (Math.max(BUFFER_SIZE,
+ MIN_BUFFER_WITH_TRANSFERTO)
+ + bytesPerChecksum - 1)/bytesPerChecksum;
+
+ // allocate smaller buffer while using transferTo().
+ pktSize += checksumSize * maxChunksPerPacket;
+ } else {
+ maxChunksPerPacket = Math.max(1,
+ (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
+ pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+ }
+
+ ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
+
+ while (endOffset > offset) {
+ long len = sendChunks(pktBuf, maxChunksPerPacket,
+ streamForSendChunks);
+ offset += len;
+ totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
+ checksumSize);
+ seqno++;
+ }
+ out.writeInt(0); // mark the end of block
+ out.flush();
+ } finally {
+ close();
+ }
+
+ blockReadFully = (initialOffset == 0 && offset >= blockLength);
+
+ return totalRead;
+ }
+
+ boolean isBlockReadFully() {
+ return blockReadFully;
+ }
+}
Added:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java?rev=685979&view=auto
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java
(added)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java
Thu Aug 14 10:58:30 2008
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+/**
+ * a class to throttle the block transfers.
+ * This class is thread safe. It can be shared by multiple threads.
+ * The parameter bandwidthPerSec specifies the total bandwidth shared by
+ * threads.
+ */
+class BlockTransferThrottler {
+ private long period; // period over which bw is imposed
+ private long periodExtension; // Max period over which bw accumulates.
+ private long bytesPerPeriod; // total number of bytes can be sent in each
period
+ private long curPeriodStart; // current period starting time
+ private long curReserve; // remaining bytes can be sent in the period
+ private long bytesAlreadyUsed;
+
+ /** Constructor
+ * @param bandwidthPerSec bandwidth allowed in bytes per second.
+ */
+ BlockTransferThrottler(long bandwidthPerSec) {
+ this(500, bandwidthPerSec); // by default throttling period is 500ms
+ }
+
+ /**
+ * Constructor
+ * @param period in milliseconds. Bandwidth is enforced over this
+ * period.
+ * @param bandwidthPerSec bandwidth allowed in bytes per second.
+ */
+ BlockTransferThrottler(long period, long bandwidthPerSec) {
+ this.curPeriodStart = System.currentTimeMillis();
+ this.period = period;
+ this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
+ this.periodExtension = period*3;
+ }
+
+ /**
+ * @return current throttle bandwidth in bytes per second.
+ */
+ synchronized long getBandwidth() {
+ return bytesPerPeriod*1000/period;
+ }
+
+ /**
+ * Sets throttle bandwidth. This takes affect latest by the end of current
+ * period.
+ *
+ * @param bytesPerSecond
+ */
+ synchronized void setBandwidth(long bytesPerSecond) {
+ if ( bytesPerSecond <= 0 ) {
+ throw new IllegalArgumentException("" + bytesPerSecond);
+ }
+ bytesPerPeriod = bytesPerSecond*period/1000;
+ }
+
+ /** Given the numOfBytes sent/received since last time throttle was called,
+ * make the current thread sleep if I/O rate is too fast
+ * compared to the given bandwidth.
+ *
+ * @param numOfBytes
+ * number of bytes sent/received since last time throttle was called
+ */
+ synchronized void throttle(long numOfBytes) {
+ if ( numOfBytes <= 0 ) {
+ return;
+ }
+
+ curReserve -= numOfBytes;
+ bytesAlreadyUsed += numOfBytes;
+
+ while (curReserve <= 0) {
+ long now = System.currentTimeMillis();
+ long curPeriodEnd = curPeriodStart + period;
+
+ if ( now < curPeriodEnd ) {
+ // Wait for next period so that curReserve can be increased.
+ try {
+ wait( curPeriodEnd - now );
+ } catch (InterruptedException ignored) {}
+ } else if ( now < (curPeriodStart + periodExtension)) {
+ curPeriodStart = curPeriodEnd;
+ curReserve += bytesPerPeriod;
+ } else {
+ // discard the prev period. Throttler might not have
+ // been used for a long time.
+ curPeriodStart = now;
+ curReserve = bytesPerPeriod - bytesAlreadyUsed;
+ }
+ }
+
+ bytesAlreadyUsed -= numOfBytes;
+ }
+}
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=685979&r1=685978&r2=685979&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
Thu Aug 14 10:58:30 2008
@@ -49,7 +49,6 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockSender;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
@@ -95,7 +94,7 @@
Random random = new Random();
- DataNode.Throttler throttler = null;
+ BlockTransferThrottler throttler = null;
private static enum ScanType {
REMOTE_READ, // Verified when a block read by a client etc
@@ -239,7 +238,7 @@
}
synchronized (this) {
- throttler = new DataNode.Throttler(200, MAX_SCAN_RATE);
+ throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
}
}
@@ -424,8 +423,8 @@
try {
adjustThrottler();
- blockSender = datanode.new BlockSender(block, 0, -1, false,
- false, true);
+ blockSender = new BlockSender(block, 0, -1, false,
+ false, true, datanode);
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());