Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1195828&r1=1195827&r2=1195828&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Tue Nov 1 05:16:53 2011 @@ -403,8 +403,8 @@ class BlockPoolSliceScanner { try { adjustThrottler(); - blockSender = new BlockSender(block, 0, -1, false, false, true, - datanode, null); + blockSender = new BlockSender(block, 0, -1, false, true, datanode, + null); DataOutputStream out = new DataOutputStream(new IOUtils.NullOutputStream());
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1195828&r1=1195827&r2=1195828&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Nov 1 05:16:53 2011 @@ -108,7 +108,8 @@ class BlockReceiver implements Closeable final BlockConstructionStage stage, final long newGs, final long minBytesRcvd, final long maxBytesRcvd, final String clientname, final DatanodeInfo srcDataNode, - final DataNode datanode) throws IOException { + final DataNode datanode, DataChecksum requestedChecksum) + throws IOException { try{ this.block = block; this.in = in; @@ -177,7 +178,7 @@ class BlockReceiver implements Closeable } } // read checksum meta information - this.checksum = DataChecksum.newDataChecksum(in); + this.checksum = requestedChecksum; this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.checksumSize = checksum.getChecksumSize(); this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites(); @@ -687,11 +688,6 @@ class BlockReceiver implements Closeable } } - void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException { - checksum.writeHeader(mirrorOut); - } - - void receiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1195828&r1=1195827&r2=1195828&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Nov 1 05:16:53 2011 @@ -134,8 +134,6 @@ class BlockSender implements java.io.Clo private final int checksumSize; /** If true, failure to read checksum is ignored */ private final boolean corruptChecksumOk; - /** true if chunk offset is needed to be sent in Checksum header */ - private final boolean chunkOffsetOK; /** Sequence number of packet being sent */ private long seqno; /** Set to true if transferTo is allowed for sending data to the client */ @@ -173,19 +171,17 @@ class BlockSender implements java.io.Clo * @param startOffset starting offset to read from * @param length length of data to read * @param corruptChecksumOk - * @param chunkOffsetOK need to send check offset in checksum header * @param verifyChecksum verify checksum while reading the data * @param datanode datanode from which the block is being read * @param clientTraceFmt format string used to print client trace logs * @throws IOException */ BlockSender(ExtendedBlock block, long startOffset, long length, - boolean corruptChecksumOk, boolean chunkOffsetOK, - boolean verifyChecksum, DataNode datanode, String clientTraceFmt) + boolean corruptChecksumOk, boolean verifyChecksum, + DataNode datanode, String clientTraceFmt) throws IOException { try { this.block = block; - this.chunkOffsetOK = chunkOffsetOK; this.corruptChecksumOk = corruptChecksumOk; this.verifyChecksum = verifyChecksum; this.clientTraceFmt = clientTraceFmt; @@ -600,8 +596,6 @@ class BlockSender implements java.io.Clo final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; try { - writeChecksumHeader(out); - int maxChunksPerPacket; int pktSize = PacketHeader.PKT_HEADER_LEN; boolean transferTo = transferToAllowed && !verifyChecksum @@ -691,22 +685,6 @@ class BlockSender implements java.io.Clo return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES; } - - /** - * Write checksum header to the output stream - */ - private void writeChecksumHeader(DataOutputStream out) throws IOException { - try { - checksum.writeHeader(out); - if (chunkOffsetOK) { - out.writeLong(offset); - } - out.flush(); - } catch (IOException e) { //socket error - throw ioeToSocketException(e); - } - } - /** * Write packet header into {@code pkt} */ @@ -720,4 +698,19 @@ class BlockSender implements java.io.Clo boolean didSendEntireByteRange() { return sentEntireByteRange; } + + /** + * @return the checksum type that will be used with this block transfer. + */ + DataChecksum getChecksum() { + return checksum; + } + + /** + * @return the offset into the block file where the sender is currently + * reading. + */ + long getOffset() { + return offset; + } } Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1195828&r1=1195827&r2=1195828&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Nov 1 05:16:53 2011 @@ -2053,7 +2053,7 @@ public class DataNode extends Configured out = new DataOutputStream(new BufferedOutputStream(baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); blockSender = new BlockSender(b, 0, b.getNumBytes(), - false, false, false, DataNode.this, null); + false, false, DataNode.this, null); DatanodeInfo srcNode = new DatanodeInfo(bpReg); // @@ -2066,7 +2066,7 @@ public class DataNode extends Configured } new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, - stage, 0, 0, 0, 0); + stage, 0, 0, 0, 0, blockSender.getChecksum()); // send data & checksum blockSender.sendBlock(out, baseStream, null); Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1195828&r1=1195827&r2=1195828&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Nov 1 05:16:53 2011 @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.E import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; @@ -225,7 +227,7 @@ class DataXceiver extends Receiver imple try { try { blockSender = new BlockSender(block, blockOffset, length, - true, true, false, datanode, clientTraceFmt); + true, false, datanode, clientTraceFmt); } catch(IOException e) { String msg = "opReadBlock " + block + " received exception " + e; LOG.info(msg); @@ -234,7 +236,8 @@ class DataXceiver extends Receiver imple } // send op status - sendResponse(s, SUCCESS, null, datanode.socketWriteTimeout); + writeSuccessWithChecksumInfo(blockSender, + getStreamWithTimeout(s, datanode.socketWriteTimeout)); long read = blockSender.sendBlock(out, baseStream, null); // send data @@ -292,7 +295,8 @@ class DataXceiver extends Receiver imple final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, - final long latestGenerationStamp) throws IOException { + final long latestGenerationStamp, + DataChecksum requestedChecksum) throws IOException { updateCurrentThreadName("Receiving block " + block + " client=" + clientname); final boolean isDatanode = clientname.length() == 0; final boolean isClient = !isDatanode; @@ -351,7 +355,7 @@ class DataXceiver extends Receiver imple s.getRemoteSocketAddress().toString(), s.getLocalSocketAddress().toString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, - clientname, srcDataNode, datanode); + clientname, srcDataNode, datanode, requestedChecksum); } else { datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd); } @@ -381,11 +385,8 @@ class DataXceiver extends Receiver imple new Sender(mirrorOut).writeBlock(originalBlock, blockToken, clientname, targets, srcDataNode, stage, pipelineSize, - minBytesRcvd, maxBytesRcvd, latestGenerationStamp); + minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum); - if (blockReceiver != null) { // send checksum header - blockReceiver.writeChecksumHeader(mirrorOut); - } mirrorOut.flush(); // read connect ack (only for clients, not for replication req) @@ -600,8 +601,8 @@ class DataXceiver extends Receiver imple try { // check if the block exists or not - blockSender = new BlockSender(block, 0, -1, false, false, false, - datanode, null); + blockSender = new BlockSender(block, 0, -1, false, false, datanode, + null); // set up response stream OutputStream baseStream = NetUtils.getOutputStream( @@ -610,7 +611,7 @@ class DataXceiver extends Receiver imple baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); // send status first - writeResponse(SUCCESS, null, reply); + writeSuccessWithChecksumInfo(blockSender, reply); // send block content to the target long read = blockSender.sendBlock(reply, baseStream, dataXceiverServer.balanceThrottler); @@ -709,11 +710,16 @@ class DataXceiver extends Receiver imple throw new IOException("Copy block " + block + " from " + proxySock.getRemoteSocketAddress() + " failed"); } + + // get checksum info about the block we're copying + ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo(); + DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); // open a block receiver and check if the block does not exist blockReceiver = new BlockReceiver( block, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), - null, 0, 0, 0, "", null, datanode); + null, 0, 0, 0, "", null, datanode, remoteChecksum); // receive a block blockReceiver.receiveBlock(null, null, null, null, @@ -767,15 +773,19 @@ class DataXceiver extends Receiver imple * @param opStatus status message to write * @param timeout send timeout **/ - private void sendResponse(Socket s, Status status, String message, + private static void sendResponse(Socket s, Status status, String message, long timeout) throws IOException { - DataOutputStream reply = - new DataOutputStream(NetUtils.getOutputStream(s, timeout)); + DataOutputStream reply = getStreamWithTimeout(s, timeout); writeResponse(status, message, reply); } - private void writeResponse(Status status, String message, OutputStream out) + private static DataOutputStream getStreamWithTimeout(Socket s, long timeout) + throws IOException { + return new DataOutputStream(NetUtils.getOutputStream(s, timeout)); + } + + private static void writeResponse(Status status, String message, OutputStream out) throws IOException { BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder() .setStatus(status); @@ -786,6 +796,22 @@ class DataXceiver extends Receiver imple out.flush(); } + private void writeSuccessWithChecksumInfo(BlockSender blockSender, + DataOutputStream out) throws IOException { + + ReadOpChecksumInfoProto ckInfo = ReadOpChecksumInfoProto.newBuilder() + .setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum())) + .setChunkOffset(blockSender.getOffset()) + .build(); + + BlockOpResponseProto response = BlockOpResponseProto.newBuilder() + .setStatus(SUCCESS) + .setReadOpChecksumInfo(ckInfo) + .build(); + response.writeDelimitedTo(out); + out.flush(); + } + private void checkAccess(DataOutputStream out, final boolean reply, final ExtendedBlock blk, Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto?rev=1195828&r1=1195827&r2=1195828&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto Tue Nov 1 05:16:53 2011 @@ -40,6 +40,17 @@ message OpReadBlockProto { required uint64 offset = 2; required uint64 len = 3; } + + +message ChecksumProto { + enum ChecksumType { + NULL = 0; + CRC32 = 1; + CRC32C = 2; + } + required ChecksumType type = 1; + required uint32 bytesPerChecksum = 2; +} message OpWriteBlockProto { required ClientOperationHeaderProto header = 1; @@ -69,6 +80,11 @@ message OpWriteBlockProto { required uint64 minBytesRcvd = 6; required uint64 maxBytesRcvd = 7; required uint64 latestGenerationStamp = 8; + + /** + * The requested checksum mechanism for this block write. + */ + required ChecksumProto requestedChecksum = 9; } message OpTransferBlockProto { @@ -114,14 +130,30 @@ message PipelineAckProto { repeated Status status = 2; } +/** + * Sent as part of the BlockOpResponseProto + * for READ_BLOCK and COPY_BLOCK operations. + */ +message ReadOpChecksumInfoProto { + required ChecksumProto checksum = 1; + + /** + * The offset into the block at which the first packet + * will start. This is necessary since reads will align + * backwards to a checksum chunk boundary. + */ + required uint64 chunkOffset = 2; +} + message BlockOpResponseProto { required Status status = 1; optional string firstBadLink = 2; optional OpBlockChecksumResponseProto checksumResponse = 3; + optional ReadOpChecksumInfoProto readOpChecksumInfo = 4; /** explanatory text which may be useful to log on the client side */ - optional string message = 4; + optional string message = 5; } /** Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1195828&r1=1195827&r2=1195828&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Nov 1 05:16:53 2011 @@ -31,6 +31,7 @@ import java.util.Random; import junit.framework.TestCase; +import org.apache.commons.digester.SetRootRule; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.D import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; @@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.d import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -59,6 +62,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.StringUtils; import org.junit.Test; +import org.mockito.Mockito; /** * This tests data transfer protocol handling in the Datanode. It sends @@ -68,6 +72,9 @@ public class TestDataTransferProtocol ex private static final Log LOG = LogFactory.getLog( "org.apache.hadoop.hdfs.TestDataTransferProtocol"); + + private static final DataChecksum DEFAULT_CHECKSUM = + DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512); DatanodeID datanode; InetSocketAddress dnAddr; @@ -149,9 +156,6 @@ public class TestDataTransferProtocol ex private void writeZeroLengthPacket(ExtendedBlock block, String description) throws IOException { - sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); - sendOut.writeInt(512); // checksum size - PacketHeader hdr = new PacketHeader( 8, // size of packet block.getNumBytes(), // OffsetInBlock @@ -188,7 +192,8 @@ public class TestDataTransferProtocol ex recvBuf.reset(); sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], null, stage, - 0, block.getNumBytes(), block.getNumBytes(), newGS); + 0, block.getNumBytes(), block.getNumBytes(), newGS, + DEFAULT_CHECKSUM); if (eofExcepted) { sendResponse(Status.ERROR, null, null, recvOut); sendRecvData(description, true); @@ -373,15 +378,16 @@ public class TestDataTransferProtocol ex /* Test OP_WRITE_BLOCK */ sendBuf.reset(); + + DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM); + Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum(); + sender.writeBlock(new ExtendedBlock(poolId, newBlockId), BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], null, BlockConstructionStage.PIPELINE_SETUP_CREATE, - 0, 0L, 0L, 0L); - sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); - - // bad bytes per checksum - sendOut.writeInt(-1-random.nextInt(oneMil)); + 0, 0L, 0L, 0L, + badChecksum); recvBuf.reset(); sendResponse(Status.ERROR, null, null, recvOut); sendRecvData("wrong bytesPerChecksum while writing", true); @@ -391,9 +397,8 @@ public class TestDataTransferProtocol ex sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L); - sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); - sendOut.writeInt(512); + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L, + DEFAULT_CHECKSUM); PacketHeader hdr = new PacketHeader( 4, // size of packet @@ -414,9 +419,8 @@ public class TestDataTransferProtocol ex sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L); - sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); - sendOut.writeInt(512); // checksum size + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L, + DEFAULT_CHECKSUM); hdr = new PacketHeader( 8, // size of packet @@ -462,7 +466,15 @@ public class TestDataTransferProtocol ex // negative length is ok. Datanode assumes we want to read the whole block. recvBuf.reset(); - sendResponse(Status.SUCCESS, null, null, recvOut); + + BlockOpResponseProto.newBuilder() + .setStatus(Status.SUCCESS) + .setReadOpChecksumInfo(ReadOpChecksumInfoProto.newBuilder() + .setChecksum(DataTransferProtoUtil.toProto(DEFAULT_CHECKSUM)) + .setChunkOffset(0L)) + .build() + .writeDelimitedTo(recvOut); + sendBuf.reset(); sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", 0L, -1L-random.nextInt(oneMil)); Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1195828&r1=1195827&r2=1195828&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Tue Nov 1 05:16:53 2011 @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.d import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.util.DataChecksum; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -140,14 +141,13 @@ public class TestDiskError { // write the header. DataOutputStream out = new DataOutputStream(s.getOutputStream()); + DataChecksum checksum = DataChecksum.newDataChecksum( + DataChecksum.CHECKSUM_CRC32, 512); new Sender(out).writeBlock(block.getBlock(), BlockTokenSecretManager.DUMMY_TOKEN, "", new DatanodeInfo[0], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L); - - // write check header - out.writeByte( 1 ); - out.writeInt( 512 ); + BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, + checksum); out.flush(); // close the connection before sending the content of the block
