Author: dhruba Date: Wed Sep 26 09:58:48 2007 New Revision: 579716 URL: http://svn.apache.org/viewvc?rev=579716&view=rev Log: HADOOP-1908. Restructure data node code so that block sending and receiving are seperated from data transfer header handling. (Hairong Kuang via dhruba)
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=579716&r1=579715&r2=579716&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Sep 26 09:58:48 2007 @@ -193,6 +193,10 @@ IMPROVEMENTS + HADOOP-1908. Restructure data node code so that block sending and + receiving are seperated from data transfer header handling. + (Hairong Kuang via dhruba) + HADOOP-1921. Save the configuration of completed/failed jobs and make them available via the web-ui. (Amar Kamat via devaraj) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=579716&r1=579715&r2=579716&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Sep 26 09:58:48 2007 @@ -1619,24 +1619,20 @@ try { - while ( bytesLeft >= 0 ) { + while ( bytesLeft > 0 ) { int len = (int) Math.min( bytesLeft, bytesPerChecksum ); - if ( len > 0 ) { - IOUtils.readFully( in, buf, 0, len + checksumSize); - } + IOUtils.readFully( in, buf, 0, len + checksumSize); blockStream.writeInt( len ); blockStream.write( buf, 0, len + checksumSize ); - if ( bytesLeft == 0 ) { - break; - } - bytesLeft -= len; if (progress != null) { progress.progress(); } } - + + // write 0 to mark the end of a block + blockStream.writeInt(0); blockStream.flush(); numSuccessfulWrites++; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=579716&r1=579715&r2=579716&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Sep 26 09:58:48 2007 @@ -744,9 +744,10 @@ * Read/write data from/to the DataXceiveServer. */ public void run() { + DataInputStream in=null; try { - DataInputStream in = new DataInputStream( - new BufferedInputStream(s.getInputStream(), BUFFER_SIZE)); + in = new DataInputStream( + new BufferedInputStream(s.getInputStream(), BUFFER_SIZE)); short version = in.readShort(); if ( version != DATA_TRANFER_VERSION ) { throw new IOException( "Version Mismatch" ); @@ -770,12 +771,10 @@ } catch (Throwable t) { LOG.error("DataXceiver: " + StringUtils.stringifyException(t)); } finally { - try { - xceiverCount.decr(); - LOG.debug("Number of active connections is: "+xceiverCount); - s.close(); - } catch (IOException ie2) { - } + xceiverCount.decr(); + LOG.debug("Number of active connections is: "+xceiverCount); + IOUtils.closeStream(in); + IOUtils.closeSocket(s); } } @@ -793,11 +792,23 @@ long startOffset = in.readLong(); long length = in.readLong(); - + + // send the block + DataOutputStream out = new DataOutputStream( + new BufferedOutputStream(s.getOutputStream(), BUFFER_SIZE)); + BlockSender blockSender = null; try { - //XXX Buffered output stream? - long read = sendBlock(s, block, startOffset, length, null ); - myMetrics.readBytes((int)read); + try { + blockSender = new BlockSender(block, startOffset, length, true, true); + } catch(IOException e) { + out.writeShort(OP_STATUS_ERROR); + throw e; + } + + out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status + long read = blockSender.sendBlock(out, null); // send data + + myMetrics.readBytes((int) read); myMetrics.readBlocks(1); LOG.info("Served block " + block + " to " + s.getInetAddress()); } catch ( SocketException ignored ) { @@ -808,14 +819,18 @@ * Earlier version shutdown() datanode if there is disk error. */ LOG.warn( "Got exception while serving " + block + " to " + - s.getInetAddress() + ": " + + s.getInetAddress() + ":\n" + StringUtils.stringifyException(ioe) ); throw ioe; + } finally { + IOUtils.closeStream(out); + IOUtils.closeStream(blockSender); } } /** * Write a block to disk. + * * @param in The stream to read from * @throws IOException */ @@ -823,62 +838,45 @@ // // Read in the header // - DataOutputStream reply = new DataOutputStream(s.getOutputStream()); - DataOutputStream out = null; - DataOutputStream checksumOut = null; - Socket mirrorSock = null; - DataOutputStream mirrorOut = null; - DataInputStream mirrorIn = null; - + Block block = new Block(in.readLong(), 0); + int numTargets = in.readInt(); + if (numTargets < 0) { + throw new IOException("Mislabelled incoming datastream."); + } + DatanodeInfo targets[] = new DatanodeInfo[numTargets]; + for (int i = 0; i < targets.length; i++) { + DatanodeInfo tmp = new DatanodeInfo(); + tmp.readFields(in); + targets[i] = tmp; + } + + short opStatus = OP_STATUS_SUCCESS; // write operation status + DataOutputStream mirrorOut = null; // stream to next target + Socket mirrorSock = null; // socket to next target + BlockReceiver blockReceiver = null; // responsible for data handling try { - /* We need an estimate for block size to check if the - * disk partition has enough space. For now we just increment - * FSDataset.reserved by configured dfs.block.size - * Other alternative is to include the block size in the header - * sent by DFSClient. - */ - Block block = new Block( in.readLong(), 0 ); - int numTargets = in.readInt(); - if ( numTargets < 0 ) { - throw new IOException("Mislabelled incoming datastream."); - } - DatanodeInfo targets[] = new DatanodeInfo[numTargets]; - for (int i = 0; i < targets.length; i++) { - DatanodeInfo tmp = new DatanodeInfo(); - tmp.readFields(in); - targets[i] = tmp; - } - - DataChecksum checksum = DataChecksum.newDataChecksum( in ); + // open a block receiver and check if the block does not exist + blockReceiver = new BlockReceiver(block, in, + s.getRemoteSocketAddress().toString()); // - // Open local disk out - // - FSDataset.BlockWriteStreams streams = data.writeToBlock( block ); - out = new DataOutputStream( - new BufferedOutputStream(streams.dataOut, BUFFER_SIZE)); - checksumOut = new DataOutputStream( - new BufferedOutputStream(streams.checksumOut, BUFFER_SIZE)); - - InetSocketAddress mirrorTarget = null; - String mirrorNode = null; - // // Open network conn to backup machine, if // appropriate // if (targets.length > 0) { + InetSocketAddress mirrorTarget = null; + String mirrorNode = null; // Connect to backup machine mirrorNode = targets[0].getName(); mirrorTarget = createSocketAddr(mirrorNode); + mirrorSock = new Socket(); try { - mirrorSock = new Socket(); mirrorSock.connect(mirrorTarget, READ_TIMEOUT); - mirrorSock.setSoTimeout(READ_TIMEOUT); + mirrorSock.setSoTimeout(numTargets*READ_TIMEOUT); mirrorOut = new DataOutputStream( new BufferedOutputStream(mirrorSock.getOutputStream(), BUFFER_SIZE)); - mirrorIn = new DataInputStream( mirrorSock.getInputStream() ); - //Copied from DFSClient.java! + // Write header: Copied from DFSClient.java! mirrorOut.writeShort( DATA_TRANFER_VERSION ); mirrorOut.write( OP_WRITE_BLOCK ); mirrorOut.writeLong( block.getBlockId() ); @@ -886,196 +884,73 @@ for ( int i = 1; i < targets.length; i++ ) { targets[i].write( mirrorOut ); } - checksum.writeHeader( mirrorOut ); - myMetrics.replicatedBlocks(1); - } catch (IOException ie) { - if (mirrorOut != null) { - LOG.info("Exception connecting to mirror " + mirrorNode - + "\n" + StringUtils.stringifyException(ie)); - mirrorOut = null; - } + } catch (IOException e) { + IOUtils.closeStream(mirrorOut); + mirrorOut = null; + IOUtils.closeSocket(mirrorSock); + mirrorSock = null; } } - - // XXX The following code is similar on both sides... - - int bytesPerChecksum = checksum.getBytesPerChecksum(); - int checksumSize = checksum.getChecksumSize(); - byte buf[] = new byte[ bytesPerChecksum + checksumSize ]; - long blockLen = 0; - long lastOffset = 0; - long lastLen = 0; - short status = -1; - boolean headerWritten = false; - - while ( true ) { - // Read one data chunk in each loop. - - long offset = lastOffset + lastLen; - int len = in.readInt(); - if ( len < 0 || len > bytesPerChecksum ) { - LOG.warn( "Got wrong length during writeBlock(" + - block + ") from " + s.getRemoteSocketAddress() + - " at offset " + offset + ": " + len + - " expected <= " + bytesPerChecksum ); - status = OP_STATUS_ERROR; - break; - } - in.readFully( buf, 0, len + checksumSize ); - - if ( len > 0 && checksumSize > 0 ) { - /* - * Verification is not included in the initial design. - * For now, it at least catches some bugs. Later, we can - * include this after showing that it does not affect - * performance much. - */ - checksum.update( buf, 0, len ); - - if ( ! checksum.compare( buf, len ) ) { - throw new IOException( "Unexpected checksum mismatch " + - "while writing " + block + - " from " + - s.getRemoteSocketAddress() ); - } - - checksum.reset(); - } + String mirrorAddr = (mirrorSock == null) ? null : + mirrorSock.getRemoteSocketAddress().toString(); + blockReceiver.receiveBlock(mirrorOut, mirrorAddr, null); + + /* + * Informing the name node could take a long long time! Should we wait + * till namenode is informed before responding with success to the + * client? For now we don't. + */ + synchronized (receivedBlockList) { + receivedBlockList.add(block); + receivedBlockList.notifyAll(); + } - // First write to remote node before writing locally. - if (mirrorOut != null) { - try { - mirrorOut.writeInt( len ); - mirrorOut.write( buf, 0, len + checksumSize ); - if (len == 0) { - mirrorOut.flush(); - } - } catch (IOException ioe) { - LOG.info( "Exception writing to mirror " + mirrorNode + - "\n" + StringUtils.stringifyException(ioe) ); - // - // If stream-copy fails, continue - // writing to disk. We shouldn't - // interrupt client write. - // - mirrorOut = null; - } - } + String msg = "Received block " + block + " from " + + s.getRemoteSocketAddress(); + /* read response from next target in the pipeline. + * ignore the response for now. Will fix it in HADOOP-1927 + */ + if( mirrorSock != null ) { + short result = OP_STATUS_ERROR; + DataInputStream mirrorIn = null; try { - if ( !headerWritten ) { - // First DATA_CHUNK. - // Write the header even if checksumSize is 0. - checksumOut.writeShort( FSDataset.METADATA_VERSION ); - checksum.writeHeader( checksumOut ); - headerWritten = true; - } - - if ( len > 0 ) { - out.write( buf, 0, len ); - // Write checksum - checksumOut.write( buf, len, checksumSize ); - myMetrics.wroteBytes( len ); - } else { - /* Should we sync() files here? It can add many millisecs of - * latency. We did not sync before HADOOP-1134 either. - */ - out.close(); - out = null; - checksumOut.close(); - checksumOut = null; - } - - } catch (IOException iex) { - checkDiskError(iex); - throw iex; + mirrorIn = new DataInputStream( mirrorSock.getInputStream() ); + result = mirrorIn.readShort(); + } catch (IOException ignored) { + } finally { + IOUtils.closeStream(mirrorIn); } - - if ( len == 0 ) { - - // We already have one successful write here. Should we - // wait for response from next target? We will skip for now. - block.setNumBytes( blockLen ); - - //Does this fsync()? - data.finalizeBlock( block ); - myMetrics.wroteBlocks(1); - - status = OP_STATUS_SUCCESS; - - break; - } - - if ( lastLen > 0 && lastLen != bytesPerChecksum ) { - LOG.warn( "Got wrong length during writeBlock(" + - block + ") from " + s.getRemoteSocketAddress() + - " : " + " got " + lastLen + " instead of " + - bytesPerChecksum ); - status = OP_STATUS_ERROR; - break; - } - - lastOffset = offset; - lastLen = len; - blockLen += len; - } - // done with reading the data. - - if ( status == OP_STATUS_SUCCESS ) { - /* Informing the name node could take a long long time! - Should we wait till namenode is informed before responding - with success to the client? For now we don't. - */ - synchronized ( receivedBlockList ) { - receivedBlockList.add( block ); - receivedBlockList.notifyAll(); - } - - String msg = "Received block " + block + " from " + - s.getInetAddress(); - - if ( mirrorOut != null ) { - //Wait for the remote reply - mirrorOut.flush(); - short result = OP_STATUS_ERROR; - try { - result = mirrorIn.readShort(); - } catch ( IOException ignored ) {} - - msg += " and " + (( result != OP_STATUS_SUCCESS ) ? - "failed to mirror to " : " mirrored to ") + - mirrorTarget; - - mirrorOut = null; - } - - LOG.info(msg); - } - - if ( status >= 0 ) { - try { - reply.writeShort( status ); - reply.flush(); - } catch ( IOException ignored ) {} + msg += " and " + (( result != OP_STATUS_SUCCESS ) ? + "failed to mirror to " : " mirrored to ") + + mirrorAddr; } - + + LOG.info(msg); + } catch (IOException ioe) { + opStatus = OP_STATUS_ERROR; + throw ioe; } finally { + // send back reply + DataOutputStream reply = new DataOutputStream(s.getOutputStream()); try { - if ( out != null ) - out.close(); - if ( checksumOut != null ) - checksumOut.close(); - if ( mirrorSock != null ) - mirrorSock.close(); - } catch (IOException iex) { - shutdown(); - throw iex; - } + reply.writeShort(opStatus); + reply.flush(); + } catch (IOException ioe) { + LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress() + + "for writing block " + block ); + LOG.warn(StringUtils.stringifyException(ioe)); + } + // close all opened streams + IOUtils.closeStream(reply); + IOUtils.closeStream(mirrorOut); + IOUtils.closeSocket(mirrorSock); + IOUtils.closeStream(blockReceiver); } } - + /** * Reads the metadata and sends the data in one 'DATA_CHUNK' * @param in @@ -1113,173 +988,401 @@ } } } + + /* An interface to throttle the block transfers */ + private interface Throttler { + /** Given the numOfBytes sent/received since last time throttle was called, + * make the current thread sleep if I/O rate is too fast + * compared to the given bandwidth + * + * @param numOfBytes + * number of bytes sent/received since last time throttle was called + */ + void throttle(int numOfBytes); + } - /** sendBlock() is used to read block and its metadata and stream - * the data to either a client or to another datanode. - * If argument targets is null, then it is assumed to be replying - * to a client request (OP_BLOCK_READ). Otherwise, we are replicating - * to another datanode. - * - * returns total bytes reads, including crc. - */ - long sendBlock(Socket sock, Block block, - long startOffset, long length, DatanodeInfo targets[] ) - throws IOException { - DataOutputStream out = new DataOutputStream( - new BufferedOutputStream(sock.getOutputStream(), - BUFFER_SIZE)); - RandomAccessFile blockInFile = null; - DataInputStream blockIn = null; - DataInputStream checksumIn = null; - long totalRead = 0; - - /* XXX This will affect inter datanode transfers during - * a CRC upgrade. There should not be any replication - * during crc upgrade since we are in safe mode, right? - */ - boolean corruptChecksumOk = targets == null; - - try { - File blockFile = data.getBlockFile( block ); - blockInFile = new RandomAccessFile(blockFile, "r"); - - File checksumFile = FSDataset.getMetaFile( blockFile ); - DataChecksum checksum = null; - - if ( !corruptChecksumOk || checksumFile.exists() ) { - checksumIn = new DataInputStream( - new BufferedInputStream(new FileInputStream(checksumFile), - BUFFER_SIZE)); - - //read and handle the common header here. For now just a version - short version = checksumIn.readShort(); - if ( version != FSDataset.METADATA_VERSION ) { - LOG.warn( "Wrong version (" + version + - ") for metadata file for " + block + " ignoring ..." ); - } - checksum = DataChecksum.newDataChecksum( checksumIn ) ; - } else { - LOG.warn( "Could not find metadata file for " + block ); - // This only decides the buffer size. Use BUFFER_SIZE? - checksum = DataChecksum.newDataChecksum( DataChecksum.CHECKSUM_NULL, - 16*1024 ); - } + private class BlockSender implements java.io.Closeable { + private Block block; // the block to read from + private DataInputStream blockIn; // data strean + private DataInputStream checksumIn; // checksum datastream + private DataChecksum checksum; // checksum stream + private long offset; // starting position to read + private long endOffset; // ending position + private byte buf[]; // buffer to store data read from the block file & crc + private int bytesPerChecksum; // chunk size + private int checksumSize; // checksum size + private boolean corruptChecksumOk; // if need to verify checksum + private boolean chunkOffsetOK; // if need to send chunk offset + + private Throttler throttler; + private DataOutputStream out; + + BlockSender(Block block, long startOffset, long length, + boolean corruptChecksumOk, boolean chunkOffsetOK) throws IOException { + RandomAccessFile blockInFile = null; - int bytesPerChecksum = checksum.getBytesPerChecksum(); - int checksumSize = checksum.getChecksumSize(); - - if (length < 0) { - length = data.getLength(block); - } + try { + this.block = block; + this.chunkOffsetOK = chunkOffsetOK; + this.corruptChecksumOk = corruptChecksumOk; + File blockFile = data.getBlockFile(block); + blockInFile = new RandomAccessFile(blockFile, "r"); + + File checksumFile = FSDataset.getMetaFile(blockFile); + + if (!corruptChecksumOk || checksumFile.exists()) { + checksumIn = new DataInputStream(new BufferedInputStream( + new FileInputStream(checksumFile), BUFFER_SIZE)); + + // read and handle the common header here. For now just a version + short version = checksumIn.readShort(); + if (version != FSDataset.METADATA_VERSION) { + LOG.warn("Wrong version (" + version + ") for metadata file for " + + block + " ignoring ..."); + } + checksum = DataChecksum.newDataChecksum(checksumIn); + } else { + LOG.warn("Could not find metadata file for " + block); + // This only decides the buffer size. Use BUFFER_SIZE? + checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, + 16 * 1024); + } + + bytesPerChecksum = checksum.getBytesPerChecksum(); + checksumSize = checksum.getChecksumSize(); - long endOffset = data.getLength( block ); - if ( startOffset < 0 || startOffset > endOffset || - (length + startOffset) > endOffset ) { - String msg = " Offset " + startOffset + " and length " + length + - " don't match block " + block + " ( blockLen " + - endOffset + " )"; - LOG.warn( "sendBlock() : " + msg ); - if ( targets != null ) { + if (length < 0) { + length = data.getLength(block); + } + + endOffset = data.getLength(block); + if (startOffset < 0 || startOffset > endOffset + || (length + startOffset) > endOffset) { + String msg = " Offset " + startOffset + " and length " + length + + " don't match block " + block + " ( blockLen " + endOffset + " )"; + LOG.warn("sendBlock() : " + msg); throw new IOException(msg); - } else { - out.writeShort( OP_STATUS_ERROR_INVALID ); - return totalRead; } + + buf = new byte[bytesPerChecksum + checksumSize]; + offset = (startOffset - (startOffset % bytesPerChecksum)); + if (length >= 0) { + // Make sure endOffset points to end of a checksumed chunk. + long tmpLen = startOffset + length + (startOffset - offset); + if (tmpLen % bytesPerChecksum != 0) { + tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum); + } + if (tmpLen < endOffset) { + endOffset = tmpLen; + } + } + + // seek to the right offsets + if (offset > 0) { + long checksumSkip = (offset / bytesPerChecksum) * checksumSize; + blockInFile.seek(offset); + if (checksumSkip > 0) { + // Should we use seek() for checksum file as well? + IOUtils.skipFully(checksumIn, checksumSkip); + } + } + + blockIn = new DataInputStream(new BufferedInputStream( + new FileInputStream(blockInFile.getFD()), BUFFER_SIZE)); + } catch (IOException ioe) { + IOUtils.closeStream(this); + IOUtils.closeStream(blockInFile); + throw ioe; } + } - byte buf[] = new byte[ bytesPerChecksum + checksumSize ]; - long offset = (startOffset - (startOffset % bytesPerChecksum)); - if ( length >= 0 ) { - // Make sure endOffset points to end of a checksumed chunk. - long tmpLen = startOffset + length + (startOffset - offset); - if ( tmpLen % bytesPerChecksum != 0 ) { - tmpLen += ( bytesPerChecksum - tmpLen % bytesPerChecksum ); + // close opened files + public void close() throws IOException { + IOException ioe = null; + // close checksum file + if(checksumIn!=null) { + try { + checksumIn.close(); + } catch (IOException e) { + ioe = e; } - if ( tmpLen < endOffset ) { - endOffset = tmpLen; + checksumIn = null; + } + // close data file + if(blockIn!=null) { + try { + blockIn.close(); + } catch (IOException e) { + ioe = e; } + blockIn = null; } + // throw IOException if there is any + if(ioe!= null) { + throw ioe; + } + } + + private int sendChunk() + throws IOException { + int len = (int) Math.min(endOffset - offset, bytesPerChecksum); + if (len == 0) + return 0; + blockIn.readFully(buf, 0, len); - // seek to the right offsets - if ( offset > 0 ) { - long checksumSkip = ( offset / bytesPerChecksum ) * checksumSize ; - blockInFile.seek(offset); - if (checksumSkip > 0) { - //Should we use seek() for checksum file as well? - IOUtils.skipFully(checksumIn, checksumSkip); + if (checksumSize > 0 && checksumIn != null) { + try { + checksumIn.readFully(buf, len, checksumSize); + } catch (IOException e) { + LOG.warn(" Could not read checksum for data at offset " + offset + + " for block " + block + " got : " + + StringUtils.stringifyException(e)); + IOUtils.closeStream(checksumIn); + checksumIn = null; + if (corruptChecksumOk) { + // Just fill the array with zeros. + Arrays.fill(buf, len, len + checksumSize, (byte) 0); + } else { + throw e; + } } } - - blockIn = new DataInputStream(new BufferedInputStream( - new FileInputStream(blockInFile.getFD()), - BUFFER_SIZE)); - - if ( targets != null ) { + + out.writeInt(len); + out.write(buf, 0, len + checksumSize); + + if (throttler != null) { // rebalancing so throttle + throttler.throttle(len + checksumSize); + } + + return len; + } + + /** + * sendBlock() is used to read block and its metadata and stream the data to + * either a client or to another datanode. + * + * @param out stream to which the block is written to + * returns total bytes reads, including crc. + */ + long sendBlock(DataOutputStream out, Throttler throttler) + throws IOException { + if( out == null ) { + throw new IOException( "out stream is null" ); + } + this.out = out; + this.throttler = throttler; + + long totalRead = 0; + try { + checksum.writeHeader(out); + if ( chunkOffsetOK ) { + out.writeLong( offset ); + } + + while (endOffset > offset) { + // Write one data chunk per loop. + long len = sendChunk(); + offset += len; + totalRead += len + checksumSize; + } + out.writeInt(0); // mark the end of block + out.flush(); + } finally { + close(); + } + + return totalRead; + } + } + + /* A class that receives a block and wites to its own disk, meanwhile + * may copies it to another site. If a throttler is provided, + * streaming throttling is also supported. + * */ + private class BlockReceiver implements java.io.Closeable { + private Block block; // the block to receive + private DataInputStream in = null; // from where data are read + private DataChecksum checksum; // from where chunks of a block can be read + private DataOutputStream out = null; // to block file at local disk + private DataOutputStream checksumOut = null; // to crc file at local disk + private int bytesPerChecksum; + private int checksumSize; + private byte buf[]; + private long offset; + final private String inAddr; + private String mirrorAddr; + private DataOutputStream mirrorOut; + private Throttler throttler; + private int lastLen = -1; + private int curLen = -1; + + BlockReceiver(Block block, DataInputStream in, String inAddr) + throws IOException { + try{ + this.block = block; + this.in = in; + this.inAddr = inAddr; + this.checksum = DataChecksum.newDataChecksum(in); + this.bytesPerChecksum = checksum.getBytesPerChecksum(); + this.checksumSize = checksum.getChecksumSize(); + this.buf = new byte[bytesPerChecksum + checksumSize]; + // - // Header info + // Open local disk out // - out.writeShort( DATA_TRANFER_VERSION ); - out.writeByte( OP_WRITE_BLOCK ); - out.writeLong( block.getBlockId() ); - out.writeInt(targets.length-1); - for (int i = 1; i < targets.length; i++) { - targets[i].write( out ); + FSDataset.BlockWriteStreams streams = data.writeToBlock(block); + this.out = new DataOutputStream(new BufferedOutputStream( + streams.dataOut, BUFFER_SIZE)); + this.checksumOut = new DataOutputStream(new BufferedOutputStream( + streams.checksumOut, BUFFER_SIZE)); + } catch(IOException ioe) { + IOUtils.closeStream(this); + throw ioe; + } + } + + // close files + public void close() throws IOException { + IOException ioe = null; + // close checksum file + try { + if (checksumOut != null) { + checksumOut.close(); + checksumOut = null; } - } else { - out.writeShort( OP_STATUS_SUCCESS ); + } catch(IOException e) { + ioe = e; + } + // close block file + try { + if (out != null) { + out.close(); + out = null; + } + } catch (IOException e) { + ioe = e; + } + // disk check + if(ioe != null) { + checkDiskError(ioe); + throw ioe; + } + } + + /* receive a chunk: write it to disk & mirror it to another stream */ + private void receiveChunk( int len ) throws IOException { + if (len <= 0 || len > bytesPerChecksum) { + throw new IOException("Got wrong length during writeBlock(" + block + + ") from " + inAddr + " at offset " + offset + ": " + len + + " expected <= " + bytesPerChecksum); } - checksum.writeHeader( out ); - - if ( targets == null ) { - out.writeLong( offset ); + if (lastLen > 0 && lastLen != bytesPerChecksum) { + throw new IOException("Got wrong length during receiveBlock(" + block + + ") from " + inAddr + " : " + " got " + lastLen + " instead of " + + bytesPerChecksum); } - - while ( endOffset >= offset ) { - // Write one data chunk per loop. - int len = (int) Math.min( endOffset - offset, bytesPerChecksum ); - if ( len > 0 ) { - blockIn.readFully( buf, 0, len ); - totalRead += len; - - if ( checksumSize > 0 && checksumIn != null ) { - try { - checksumIn.readFully( buf, len, checksumSize ); - totalRead += checksumSize; - } catch ( IOException e ) { - LOG.warn( " Could not read checksum for data at offset " + - offset + " for block " + block + " got : " + - StringUtils.stringifyException(e) ); - IOUtils.closeStream( checksumIn ); - checksumIn = null; - if ( corruptChecksumOk ) { - // Just fill the array with zeros. - Arrays.fill( buf, len, len + checksumSize, (byte)0 ); - } else { - throw e; - } - } - } - } - out.writeInt( len ); - out.write( buf, 0, len + checksumSize ); - - if ( offset == endOffset ) { - out.flush(); - // We are not waiting for response from target. - break; + lastLen = curLen; + curLen = len; + + in.readFully(buf, 0, len + checksumSize); + + /* + * Verification is not included in the initial design. For now, it at + * least catches some bugs. Later, we can include this after showing that + * it does not affect performance much. + */ + checksum.update(buf, 0, len); + + if (!checksum.compare(buf, len)) { + throw new IOException("Unexpected checksum mismatch " + + "while writing " + block + " from " + inAddr); + } + + checksum.reset(); + + // First write to remote node before writing locally. + if (mirrorOut != null) { + try { + mirrorOut.writeInt(len); + mirrorOut.write(buf, 0, len + checksumSize); + } catch (IOException ioe) { + LOG.info("Exception writing to mirror " + mirrorAddr + "\n" + + StringUtils.stringifyException(ioe)); + // + // If stream-copy fails, continue + // writing to disk. We shouldn't + // interrupt client write. + // + mirrorOut = null; } - offset += len; } - } finally { - IOUtils.closeStream( blockInFile ); - IOUtils.closeStream( checksumIn ); - IOUtils.closeStream( blockIn ); - IOUtils.closeStream( out ); + + try { + out.write(buf, 0, len); + // Write checksum + checksumOut.write(buf, len, checksumSize); + myMetrics.wroteBytes(len); + } catch (IOException iex) { + checkDiskError(iex); + throw iex; + } + + if (throttler != null) { // throttle I/O + throttler.throttle(len + checksumSize); + } + } + + public void receiveBlock(DataOutputStream mirrorOut, + String mirrorAddr, Throttler throttler) throws IOException { + + this.mirrorOut = mirrorOut; + this.mirrorAddr = mirrorAddr; + this.throttler = throttler; + + /* + * We need an estimate for block size to check if the disk partition has + * enough space. For now we just increment FSDataset.reserved by + * configured dfs.block.size Other alternative is to include the block + * size in the header sent by DFSClient. + */ + + try { + // write data chunk header + checksumOut.writeShort(FSDataset.METADATA_VERSION); + checksum.writeHeader(checksumOut); + if (mirrorOut != null) { + checksum.writeHeader(mirrorOut); + this.mirrorAddr = mirrorAddr; + } + + int len = in.readInt(); + while (len != 0) { + receiveChunk( len ); + offset += len; + len = in.readInt(); + } + + // flush the mirror out + if (mirrorOut != null) { + mirrorOut.writeInt(0); // mark the end of the stream + mirrorOut.flush(); + } + + // close the block/crc files + close(); + + // Finalize the block. Does this fsync()? + block.setNumBytes(offset); + data.finalizeBlock(block); + myMetrics.wroteBlocks(1); + } catch (IOException ioe) { + IOUtils.closeStream(this); + throw ioe; + } } - - return totalRead; } /** @@ -1305,21 +1408,40 @@ public void run() { xmitsInProgress++; Socket sock = null; + DataOutputStream out = null; + BlockSender blockSender = null; try { InetSocketAddress curTarget = createSocketAddr(targets[0].getName()); sock = new Socket(); sock.connect(curTarget, READ_TIMEOUT); - sock.setSoTimeout(READ_TIMEOUT); - sendBlock( sock, b, 0, -1, targets ); - LOG.info( "Transmitted block " + b + " to " + curTarget ); - - } catch ( IOException ie ) { - LOG.warn( "Failed to transfer " + b + " to " + - targets[0].getName() + " got " + - StringUtils.stringifyException( ie ) ); + sock.setSoTimeout(targets.length*READ_TIMEOUT); + + out = new DataOutputStream(new BufferedOutputStream( + sock.getOutputStream(), BUFFER_SIZE)); + blockSender = new BlockSender(b, 0, -1, false, false); + + // + // Header info + // + out.writeShort(DATA_TRANFER_VERSION); + out.writeByte(OP_WRITE_BLOCK); + out.writeLong(b.getBlockId()); + // write targets + out.writeInt(targets.length - 1); + for (int i = 1; i < targets.length; i++) { + targets[i].write(out); + } + // send data & checksum + blockSender.sendBlock(out, null); + LOG.info("Transmitted block " + b + " to " + curTarget); + } catch (IOException ie) { + LOG.warn("Failed to transfer " + b + " to " + targets[0].getName() + + " got " + StringUtils.stringifyException(ie)); } finally { + IOUtils.closeStream(blockSender); + IOUtils.closeStream(out); IOUtils.closeSocket(sock); xmitsInProgress--; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=579716&r1=579715&r2=579716&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Sep 26 09:58:48 2007 @@ -97,7 +97,13 @@ * This should change when serialization of DatanodeInfo, not just * when protocol changes. It is not very obvious. */ - public static final int DATA_TRANFER_VERSION = 5; //Should it be 1? + /* Version 6: + * 0 marks the end of a block not an EMPTY_CHUNK + * OP_READ_BLOCK: return OP_STATUS_ERROR if received an invalid block id + * return OP_STATUS_ERROR if received an invalid length + * OP_WRITE_BLOCK: return OP_STATUS_ERROR if illegal bytesPerChecksum + */ + public static final int DATA_TRANFER_VERSION = 6; //Should it be 1? // Return codes for file create public static final int OPERATION_FAILED = 0; Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=579716&r1=579715&r2=579716&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Wed Sep 26 09:58:48 2007 @@ -171,7 +171,9 @@ byteBuf.put((byte)DataChecksum.CHECKSUM_CRC32); byteBuf.putInt(-1-random.nextInt(oneMil)); - sendRecvData("wrong bytesPerChecksum while writing", true); + recvByteBuf.position(0); + recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR); + sendRecvData("wrong bytesPerChecksum while writing", false); byteBuf.putInt(checksumPos+1, 512); byteBuf.putInt(targetPos, -1-random.nextInt(oneMil)); @@ -204,14 +206,11 @@ byteBuf.putLong(0L); int lenPos = byteBuf.position(); byteBuf.putLong(fileLen); - /* We should change DataNode to return ERROR_INVALID instead of closing - * the connection. - */ - sendRecvData("Wrong block ID for read", true); + recvByteBuf.position(0); + recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR); + sendRecvData("Wrong block ID for read", false); byteBuf.putLong(blockPos, firstBlock.getBlockId()); - recvByteBuf.position(0); - recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR_INVALID); byteBuf.putLong(startOffsetPos, -1-random.nextInt(oneMil)); sendRecvData("Negative start-offset for read", false); @@ -224,7 +223,7 @@ byteBuf.putLong(lenPos, -1-random.nextInt(oneMil)); sendRecvData("Negative length for read", false); - recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_ERROR_INVALID); + recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_ERROR); byteBuf.putLong(lenPos, fileLen+1); sendRecvData("Wrong length for read", false); byteBuf.putLong(lenPos, fileLen);