Repository: hadoop Updated Branches: refs/heads/trunk 0a5def155 -> 8b281bce8
HDFS-10548. Remove the long deprecated BlockReaderRemote. Contributed by Kai Zheng Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b281bce Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b281bce Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b281bce Branch: refs/heads/trunk Commit: 8b281bce85474501868d68f8d5590a6086abb7b7 Parents: 0a5def1 Author: Kai Zheng <[email protected]> Authored: Sun Jul 3 11:56:23 2016 +0800 Committer: Kai Zheng <[email protected]> Committed: Sun Jul 3 11:56:23 2016 +0800 ---------------------------------------------------------------------- .../hdfs/client/impl/BlockReaderFactory.java | 18 +- .../hdfs/client/impl/BlockReaderRemote.java | 570 +++++++++---------- .../hdfs/client/impl/BlockReaderRemote2.java | 474 --------------- .../hadoop/hdfs/client/impl/DfsClientConf.java | 12 - .../erasurecode/StripedBlockReader.java | 4 +- .../hdfs/client/impl/TestBlockReaderBase.java | 97 ---- .../hdfs/client/impl/TestBlockReaderRemote.java | 80 ++- .../client/impl/TestBlockReaderRemote2.java | 27 - .../impl/TestClientBlockVerification.java | 10 +- .../shortcircuit/TestShortCircuitLocalRead.java | 16 +- 10 files changed, 360 insertions(+), 948 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b281bce/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java index f4b62d9..5a22c33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -844,19 +844,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { @SuppressWarnings("deprecation") private BlockReader getRemoteBlockReader(Peer peer) throws IOException { int networkDistance = clientContext.getNetworkDistance(datanode); - if (conf.getShortCircuitConf().isUseLegacyBlockReader()) { - return BlockReaderRemote.newBlockReader(fileName, - block, token, startOffset, length, conf.getIoBufferSize(), - verifyChecksum, clientName, peer, datanode, - clientContext.getPeerCache(), cachingStrategy, tracer, - networkDistance); - } else { - return BlockReaderRemote2.newBlockReader( - fileName, block, token, startOffset, length, - verifyChecksum, clientName, peer, datanode, - clientContext.getPeerCache(), cachingStrategy, tracer, - networkDistance); - } + return BlockReaderRemote.newBlockReader( + fileName, block, token, startOffset, length, + verifyChecksum, clientName, peer, datanode, + clientContext.getPeerCache(), cachingStrategy, tracer, + networkDistance); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b281bce/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java index 22d4e23..5a2ce40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java @@ -17,72 +17,95 @@ */ package org.apache.hadoop.hdfs.client.impl; -import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.util.EnumSet; +import java.util.UUID; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.FSInputChecker; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.PeerCache; import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; import org.apache.htrace.core.TraceScope; + +import com.google.common.annotations.VisibleForTesting; + import org.apache.htrace.core.Tracer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** - * @deprecated this is an old implementation that is being left around - * in case any issues spring up with the new {@link BlockReaderRemote2} - * implementation. - * It will be removed in the next release. + * This is a wrapper around connection to datanode + * and understands checksum, offset etc. + * + * Terminology: + * <dl> + * <dt>block</dt> + * <dd>The hdfs block, typically large (~64MB). + * </dd> + * <dt>chunk</dt> + * <dd>A block is divided into chunks, each comes with a checksum. + * We want transfers to be chunk-aligned, to be able to + * verify checksums. + * </dd> + * <dt>packet</dt> + * <dd>A grouping of chunks used for transport. It contains a + * header, followed by checksum data, followed by real data. + * </dd> + * </dl> + * Please see DataNode for the RPC specification. + * + * This is a new implementation introduced in Hadoop 0.23 which + * is more efficient and simpler than the older BlockReader + * implementation. It is renamed to BlockReaderRemote from BlockReaderRemote2. + * */ @InterfaceAudience.Private -@Deprecated -public class BlockReaderRemote extends FSInputChecker implements BlockReader { - static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class); +public class BlockReaderRemote implements BlockReader { + + static final Logger LOG = LoggerFactory.getLogger(BlockReaderRemote.class); + static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB; + + final private Peer peer; + final private DatanodeID datanodeID; + final private PeerCache peerCache; + final private long blockId; + private final ReadableByteChannel in; - private final Peer peer; - private final DatanodeID datanodeID; - private final DataInputStream in; private DataChecksum checksum; + private final PacketReceiver packetReceiver = new PacketReceiver(true); + + private ByteBuffer curDataSlice = null; /** offset in block of the last chunk received */ - private long lastChunkOffset = -1; - private long lastChunkLen = -1; private long lastSeqNo = -1; /** offset in block where reader wants to actually read */ private long startOffset; - - private final long blockId; - - /** offset in block of of first chunk - may be less than startOffset - if startOffset is not chunk-aligned */ - private final long firstChunkOffset; + private final String filename; private final int bytesPerChecksum; private final int checksumSize; @@ -92,269 +115,182 @@ public class BlockReaderRemote extends FSInputChecker implements BlockReader { * This is the amount that the user has requested plus some padding * at the beginning so that the read can begin on a chunk boundary. */ - private final long bytesNeededToFinish; + private long bytesNeededToFinish; - private boolean eos = false; - private boolean sentStatusCode = false; - - ByteBuffer checksumBytes = null; - /** Amount of unread data in the current received packet */ - int dataLeft = 0; + private final boolean verifyChecksum; - private final PeerCache peerCache; + private boolean sentStatusCode = false; private final Tracer tracer; private final int networkDistance; - /* FSInputChecker interface */ + @VisibleForTesting + public Peer getPeer() { + return peer; + } - /* same interface as inputStream java.io.InputStream#read() - * used by DFSInputStream#read() - * This violates one rule when there is a checksum error: - * "Read should not modify user buffer before successful read" - * because it first reads the data to user buffer and then checks - * the checksum. - */ @Override public synchronized int read(byte[] buf, int off, int len) throws IOException { - - // This has to be set here, *before* the skip, since we can - // hit EOS during the skip, in the case that our entire read - // is smaller than the checksum chunk. - boolean eosBefore = eos; - - //for the first read, skip the extra bytes at the front. - if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { - // Skip these bytes. But don't call this.skip()! - int toSkip = (int)(startOffset - firstChunkOffset); - if ( super.readAndDiscard(toSkip) != toSkip ) { - // should never happen - throw new IOException("Could not skip required number of bytes"); + UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null); + LOG.trace("Starting read #{} file {} from datanode {}", + randomId, filename, datanodeID.getHostName()); + + if (curDataSlice == null || + curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + try (TraceScope ignored = tracer.newScope( + "BlockReaderRemote2#readNextPacket(" + blockId + ")")) { + readNextPacket(); } } - int nRead = super.read(buf, off, len); + LOG.trace("Finishing read #{}", randomId); - // if eos was set in the previous read, send a status code to the DN - if (eos && !eosBefore && nRead >= 0) { - if (needChecksum()) { - sendReadResult(peer, Status.CHECKSUM_OK); - } else { - sendReadResult(peer, Status.SUCCESS); - } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; } + + int nRead = Math.min(curDataSlice.remaining(), len); + curDataSlice.get(buf, off, nRead); + return nRead; } + @Override - public synchronized long skip(long n) throws IOException { - /* How can we make sure we don't throw a ChecksumException, at least - * in majority of the cases?. This one throws. */ - long nSkipped = 0; - while (nSkipped < n) { - int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE); - int ret = readAndDiscard(toSkip); - if (ret <= 0) { - return nSkipped; + public synchronized int read(ByteBuffer buf) throws IOException { + if (curDataSlice == null || + (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) { + try (TraceScope ignored = tracer.newScope( + "BlockReaderRemote2#readNextPacket(" + blockId + ")")) { + readNextPacket(); } - nSkipped += ret; } - return nSkipped; - } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; + } - @Override - public int read() throws IOException { - throw new IOException("read() is not expected to be invoked. " + - "Use read(buf, off, len) instead."); - } + int nRead = Math.min(curDataSlice.remaining(), buf.remaining()); + ByteBuffer writeSlice = curDataSlice.duplicate(); + writeSlice.limit(writeSlice.position() + nRead); + buf.put(writeSlice); + curDataSlice.position(writeSlice.position()); - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - /* Checksum errors are handled outside the BlockReader. - * DFSInputStream does not always call 'seekToNewSource'. In the - * case of pread(), it just tries a different replica without seeking. - */ - return false; + return nRead; } - @Override - public void seek(long pos) throws IOException { - throw new IOException("Seek() is not supported in BlockInputChecker"); - } + private void readNextPacket() throws IOException { + //Read packet headers. + packetReceiver.receiveNextPacket(in); - @Override - protected long getChunkPosition(long pos) { - throw new RuntimeException("getChunkPosition() is not supported, " + - "since seek is not required"); - } + PacketHeader curHeader = packetReceiver.getHeader(); + curDataSlice = packetReceiver.getDataSlice(); + assert curDataSlice.capacity() == curHeader.getDataLen(); - /** - * Makes sure that checksumBytes has enough capacity - * and limit is set to the number of checksum bytes needed - * to be read. - */ - private void adjustChecksumBytes(int dataLen) { - int requiredSize = - ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; - if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { - checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); - } else { - checksumBytes.clear(); - } - checksumBytes.limit(requiredSize); - } + LOG.trace("DFSClient readNextPacket got header {}", curHeader); - @Override - protected synchronized int readChunk(long pos, byte[] buf, int offset, - int len, byte[] checksumBuf) - throws IOException { - try (TraceScope ignored = tracer.newScope( - "BlockReaderRemote#readChunk(" + blockId + ")")) { - return readChunkImpl(pos, buf, offset, len, checksumBuf); + // Sanity check the lengths + if (!curHeader.sanityCheck(lastSeqNo)) { + throw new IOException("BlockReader: error in packet header " + + curHeader); } - } - private synchronized int readChunkImpl(long pos, byte[] buf, int offset, - int len, byte[] checksumBuf) - throws IOException { - // Read one chunk. - if (eos) { - // Already hit EOF - return -1; + if (curHeader.getDataLen() > 0) { + int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; + int checksumsLen = chunks * checksumSize; + + assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : + "checksum slice capacity=" + + packetReceiver.getChecksumSlice().capacity() + + " checksumsLen=" + checksumsLen; + + lastSeqNo = curHeader.getSeqno(); + if (verifyChecksum && curDataSlice.remaining() > 0) { + // N.B.: the checksum error offset reported here is actually + // relative to the start of the block, not the start of the file. + // This is slightly misleading, but preserves the behavior from + // the older BlockReader. + checksum.verifyChunkedSums(curDataSlice, + packetReceiver.getChecksumSlice(), + filename, curHeader.getOffsetInBlock()); + } + bytesNeededToFinish -= curHeader.getDataLen(); } - // Read one DATA_CHUNK. - long chunkOffset = lastChunkOffset; - if ( lastChunkLen > 0 ) { - chunkOffset += lastChunkLen; + // First packet will include some data prior to the first byte + // the user requested. Skip it. + if (curHeader.getOffsetInBlock() < startOffset) { + int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); + curDataSlice.position(newPos); } - // pos is relative to the start of the first chunk of the read. - // chunkOffset is relative to the start of the block. - // This makes sure that the read passed from FSInputChecker is the - // for the same chunk we expect to be reading from the DN. - if ( (pos + firstChunkOffset) != chunkOffset ) { - throw new IOException("Mismatch in pos : " + pos + " + " + - firstChunkOffset + " != " + chunkOffset); + // If we've now satisfied the whole client read, read one last packet + // header, which should be empty + if (bytesNeededToFinish <= 0) { + readTrailingEmptyPacket(); + if (verifyChecksum) { + sendReadResult(Status.CHECKSUM_OK); + } else { + sendReadResult(Status.SUCCESS); + } } + } - // Read next packet if the previous packet has been read completely. - if (dataLeft <= 0) { - //Read packet headers. - PacketHeader header = new PacketHeader(); - header.readFields(in); - - LOG.debug("DFSClient readChunk got header {}", header); - - // Sanity check the lengths - if (!header.sanityCheck(lastSeqNo)) { - throw new IOException("BlockReader: error in packet header " + - header); + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + long skipped = 0; + while (skipped < n) { + long needToSkip = n - skipped; + if (curDataSlice == null || + curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + readNextPacket(); } - - lastSeqNo = header.getSeqno(); - dataLeft = header.getDataLen(); - adjustChecksumBytes(header.getDataLen()); - if (header.getDataLen() > 0) { - IOUtils.readFully(in, checksumBytes.array(), 0, - checksumBytes.limit()); + if (curDataSlice.remaining() == 0) { + // we're at EOF now + break; } - } - - // Sanity checks - assert len >= bytesPerChecksum; - assert checksum != null; - assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0); - - - int checksumsToRead, bytesToRead; - if (checksumSize > 0) { - - // How many chunks left in our packet - this is a ceiling - // since we may have a partial chunk at the end of the file - int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; - - // How many chunks we can fit in databuffer - // - note this is a floor since we always read full chunks - int chunksCanFit = Math.min(len / bytesPerChecksum, - checksumBuf.length / checksumSize); - - // How many chunks should we read - checksumsToRead = Math.min(chunksLeft, chunksCanFit); - // How many bytes should we actually read - bytesToRead = Math.min( - checksumsToRead * bytesPerChecksum, // full chunks - dataLeft); // in case we have a partial - } else { - // no checksum - bytesToRead = Math.min(dataLeft, len); - checksumsToRead = 0; - } - - if ( bytesToRead > 0 ) { - // Assert we have enough space - assert bytesToRead <= len; - assert checksumBytes.remaining() >= checksumSize * checksumsToRead; - assert checksumBuf.length >= checksumSize * checksumsToRead; - IOUtils.readFully(in, buf, offset, bytesToRead); - checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead); + int skip = (int)Math.min(curDataSlice.remaining(), needToSkip); + curDataSlice.position(curDataSlice.position() + skip); + skipped += skip; } + return skipped; + } - dataLeft -= bytesToRead; - assert dataLeft >= 0; - - lastChunkOffset = chunkOffset; - lastChunkLen = bytesToRead; - - // If there's no data left in the current packet after satisfying - // this read, and we have satisfied the client read, we expect - // an empty packet header from the DN to signify this. - // Note that pos + bytesToRead may in fact be greater since the - // DN finishes off the entire last chunk. - if (dataLeft == 0 && - pos + bytesToRead >= bytesNeededToFinish) { - - // Read header - PacketHeader hdr = new PacketHeader(); - hdr.readFields(in); + private void readTrailingEmptyPacket() throws IOException { + LOG.trace("Reading empty packet at end of read"); - if (!hdr.isLastPacketInBlock() || - hdr.getDataLen() != 0) { - throw new IOException("Expected empty end-of-read packet! Header: " + - hdr); - } + packetReceiver.receiveNextPacket(in); - eos = true; + PacketHeader trailer = packetReceiver.getHeader(); + if (!trailer.isLastPacketInBlock() || + trailer.getDataLen() != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + trailer); } - - if ( bytesToRead == 0 ) { - return -1; - } - - return bytesToRead; } - private BlockReaderRemote(String file, String bpid, long blockId, - DataInputStream in, DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache, Tracer tracer, - int networkDistance) { + protected BlockReaderRemote(String file, long blockId, + DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, + long bytesToRead, Peer peer, + DatanodeID datanodeID, PeerCache peerCache, + Tracer tracer, + int networkDistance) { // Path is used only for printing block and file information in debug - super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + - ":" + bpid + ":of:"+ file)/*too non path-like?*/, - 1, verifyChecksum, - checksum.getChecksumSize() > 0? checksum : null, - checksum.getBytesPerChecksum(), - checksum.getChecksumSize()); - this.peer = peer; this.datanodeID = datanodeID; - this.in = in; + this.in = peer.getInputStreamChannel(); this.checksum = checksum; + this.verifyChecksum = verifyChecksum; this.startOffset = Math.max( startOffset, 0 ); + this.filename = file; + this.peerCache = peerCache; this.blockId = blockId; // The total number of bytes that we need to transfer from the DN is @@ -362,18 +298,81 @@ public class BlockReaderRemote extends FSInputChecker implements BlockReader { // the beginning in order to chunk-align. Note that the DN may elect // to send more than this amount if the read starts/ends mid-chunk. this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); - - this.firstChunkOffset = firstChunkOffset; - lastChunkOffset = firstChunkOffset; - lastChunkLen = -1; - bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); - this.peerCache = peerCache; this.tracer = tracer; this.networkDistance = networkDistance; } + + @Override + public synchronized void close() throws IOException { + packetReceiver.close(); + startOffset = -1; + checksum = null; + if (peerCache != null && sentStatusCode) { + peerCache.put(datanodeID, peer); + } else { + peer.close(); + } + + // in will be closed when its Socket is closed. + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Status statusCode) { + assert !sentStatusCode : "already sent status code to " + peer; + try { + writeReadResult(peer.getOutputStream(), statusCode); + sentStatusCode = true; + } catch (IOException e) { + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + peer.getRemoteAddressString() + ": " + e.getMessage()); + } + } + + /** + * Serialize the actual read result on the wire. + */ + static void writeReadResult(OutputStream out, Status statusCode) + throws IOException { + + ClientReadStatusProto.newBuilder() + .setStatus(statusCode) + .build() + .writeDelimitedTo(out); + + out.flush(); + } + + /** + * File name to print when accessing a block directly (from servlets) + * @param s Address of the block location + * @param poolId Block pool ID of the block + * @param blockId Block ID of the block + * @return string that has a file name for debug purposes + */ + public static String getFileName(final InetSocketAddress s, + final String poolId, final long blockId) { + return s.toString() + ":" + poolId + ":" + blockId; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); + } + /** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. @@ -383,38 +382,37 @@ public class BlockReaderRemote extends FSInputChecker implements BlockReader { * @param blockToken The block token for security * @param startOffset The read offset, relative to block head * @param len The number of bytes to read - * @param bufferSize The IO buffer size (not the client buffer size) * @param verifyChecksum Whether to verify checksum * @param clientName Client name + * @param peer The Peer to use + * @param datanodeID The DatanodeID this peer is connected to * @return New BlockReader instance, or null on error. */ - public static BlockReaderRemote newBlockReader(String file, + public static BlockReader newBlockReader(String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, - int bufferSize, boolean verifyChecksum, - String clientName, Peer peer, - DatanodeID datanodeID, + boolean verifyChecksum, + String clientName, + Peer peer, DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy, - Tracer tracer, int networkDistance) - throws IOException { + Tracer tracer, + int networkDistance) throws IOException { // in and out will be closed when sock is closed (by the caller) - final DataOutputStream out = - new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); + final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( + peer.getOutputStream())); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy); // - // Get bytes in block, set streams + // Get bytes in block // - - DataInputStream in = new DataInputStream( - new BufferedInputStream(peer.getInputStream(), bufferSize)); + DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(in)); - BlockReaderRemote2.checkSuccess(status, peer, block, file); + checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( @@ -431,63 +429,29 @@ public class BlockReaderRemote extends FSInputChecker implements BlockReader { startOffset + " for file " + file); } - return new BlockReaderRemote(file, block.getBlockPoolId(), block.getBlockId(), - in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, - peer, datanodeID, peerCache, tracer, networkDistance); + return new BlockReaderRemote(file, block.getBlockId(), checksum, + verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, + peerCache, tracer, networkDistance); } - @Override - public synchronized void close() throws IOException { - startOffset = -1; - checksum = null; - if (peerCache != null & sentStatusCode) { - peerCache.put(datanodeID, peer); - } else { - peer.close(); - } - - // in will be closed when its Socket is closed. - } - - @Override - public void readFully(byte[] buf, int readOffset, int amtToRead) + static void checkSuccess( + BlockOpResponseProto status, Peer peer, + ExtendedBlock block, String file) throws IOException { - IOUtils.readFully(this, buf, readOffset, amtToRead); - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return readFully(this, buf, offset, len); - } - - /** - * When the reader reaches end of the read, it sends a status response - * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN - * closing our connection (which we will re-open), but won't affect - * data correctness. - */ - void sendReadResult(Peer peer, Status statusCode) { - assert !sentStatusCode : "already sent status code to " + peer; - try { - BlockReaderRemote2.writeReadResult(peer.getOutputStream(), statusCode); - sentStatusCode = true; - } catch (IOException e) { - // It's ok not to be able to send this. But something is probably wrong. - LOG.info("Could not send read status (" + statusCode + ") to datanode " + - peer.getRemoteAddressString() + ": " + e.getMessage()); - } - } - - @Override - public int read(ByteBuffer buf) throws IOException { - throw new UnsupportedOperationException("readDirect unsupported in BlockReaderRemote"); + String logInfo = "for OP_READ_BLOCK" + + ", self=" + peer.getLocalAddressString() + + ", remote=" + peer.getRemoteAddressString() + + ", for file " + file + + ", for pool " + block.getBlockPoolId() + + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); + DataTransferProtoUtil.checkBlockOpStatus(status, logInfo); } @Override public int available() { // An optimistic estimate of how much data is available // to us without doing network I/O. - return BlockReaderRemote2.TCP_WINDOW_SIZE; + return TCP_WINDOW_SIZE; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b281bce/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java deleted file mode 100644 index ebdc3fd..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java +++ /dev/null @@ -1,474 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.client.impl; - -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.util.EnumSet; -import java.util.UUID; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.BlockReader; -import org.apache.hadoop.hdfs.PeerCache; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.core.TraceScope; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.htrace.core.Tracer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a wrapper around connection to datanode - * and understands checksum, offset etc. - * - * Terminology: - * <dl> - * <dt>block</dt> - * <dd>The hdfs block, typically large (~64MB). - * </dd> - * <dt>chunk</dt> - * <dd>A block is divided into chunks, each comes with a checksum. - * We want transfers to be chunk-aligned, to be able to - * verify checksums. - * </dd> - * <dt>packet</dt> - * <dd>A grouping of chunks used for transport. It contains a - * header, followed by checksum data, followed by real data. - * </dd> - * </dl> - * Please see DataNode for the RPC specification. - * - * This is a new implementation introduced in Hadoop 0.23 which - * is more efficient and simpler than the older BlockReader - * implementation. It should be renamed to BlockReaderRemote - * once we are confident in it. - */ [email protected] -public class BlockReaderRemote2 implements BlockReader { - - static final Logger LOG = LoggerFactory.getLogger(BlockReaderRemote2.class); - static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB; - - final private Peer peer; - final private DatanodeID datanodeID; - final private PeerCache peerCache; - final private long blockId; - private final ReadableByteChannel in; - - private DataChecksum checksum; - private final PacketReceiver packetReceiver = new PacketReceiver(true); - - private ByteBuffer curDataSlice = null; - - /** offset in block of the last chunk received */ - private long lastSeqNo = -1; - - /** offset in block where reader wants to actually read */ - private long startOffset; - private final String filename; - - private final int bytesPerChecksum; - private final int checksumSize; - - /** - * The total number of bytes we need to transfer from the DN. - * This is the amount that the user has requested plus some padding - * at the beginning so that the read can begin on a chunk boundary. - */ - private long bytesNeededToFinish; - - private final boolean verifyChecksum; - - private boolean sentStatusCode = false; - - private final Tracer tracer; - - private final int networkDistance; - - @VisibleForTesting - public Peer getPeer() { - return peer; - } - - @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null); - LOG.trace("Starting read #{} file {} from datanode {}", - randomId, filename, datanodeID.getHostName()); - - if (curDataSlice == null || - curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - try (TraceScope ignored = tracer.newScope( - "BlockReaderRemote2#readNextPacket(" + blockId + ")")) { - readNextPacket(); - } - } - - LOG.trace("Finishing read #{}", randomId); - - if (curDataSlice.remaining() == 0) { - // we're at EOF now - return -1; - } - - int nRead = Math.min(curDataSlice.remaining(), len); - curDataSlice.get(buf, off, nRead); - - return nRead; - } - - - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - if (curDataSlice == null || - (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) { - try (TraceScope ignored = tracer.newScope( - "BlockReaderRemote2#readNextPacket(" + blockId + ")")) { - readNextPacket(); - } - } - if (curDataSlice.remaining() == 0) { - // we're at EOF now - return -1; - } - - int nRead = Math.min(curDataSlice.remaining(), buf.remaining()); - ByteBuffer writeSlice = curDataSlice.duplicate(); - writeSlice.limit(writeSlice.position() + nRead); - buf.put(writeSlice); - curDataSlice.position(writeSlice.position()); - - return nRead; - } - - private void readNextPacket() throws IOException { - //Read packet headers. - packetReceiver.receiveNextPacket(in); - - PacketHeader curHeader = packetReceiver.getHeader(); - curDataSlice = packetReceiver.getDataSlice(); - assert curDataSlice.capacity() == curHeader.getDataLen(); - - LOG.trace("DFSClient readNextPacket got header {}", curHeader); - - // Sanity check the lengths - if (!curHeader.sanityCheck(lastSeqNo)) { - throw new IOException("BlockReader: error in packet header " + - curHeader); - } - - if (curHeader.getDataLen() > 0) { - int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; - int checksumsLen = chunks * checksumSize; - - assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : - "checksum slice capacity=" + - packetReceiver.getChecksumSlice().capacity() + - " checksumsLen=" + checksumsLen; - - lastSeqNo = curHeader.getSeqno(); - if (verifyChecksum && curDataSlice.remaining() > 0) { - // N.B.: the checksum error offset reported here is actually - // relative to the start of the block, not the start of the file. - // This is slightly misleading, but preserves the behavior from - // the older BlockReader. - checksum.verifyChunkedSums(curDataSlice, - packetReceiver.getChecksumSlice(), - filename, curHeader.getOffsetInBlock()); - } - bytesNeededToFinish -= curHeader.getDataLen(); - } - - // First packet will include some data prior to the first byte - // the user requested. Skip it. - if (curHeader.getOffsetInBlock() < startOffset) { - int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); - curDataSlice.position(newPos); - } - - // If we've now satisfied the whole client read, read one last packet - // header, which should be empty - if (bytesNeededToFinish <= 0) { - readTrailingEmptyPacket(); - if (verifyChecksum) { - sendReadResult(Status.CHECKSUM_OK); - } else { - sendReadResult(Status.SUCCESS); - } - } - } - - @Override - public synchronized long skip(long n) throws IOException { - /* How can we make sure we don't throw a ChecksumException, at least - * in majority of the cases?. This one throws. */ - long skipped = 0; - while (skipped < n) { - long needToSkip = n - skipped; - if (curDataSlice == null || - curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - readNextPacket(); - } - if (curDataSlice.remaining() == 0) { - // we're at EOF now - break; - } - - int skip = (int)Math.min(curDataSlice.remaining(), needToSkip); - curDataSlice.position(curDataSlice.position() + skip); - skipped += skip; - } - return skipped; - } - - private void readTrailingEmptyPacket() throws IOException { - LOG.trace("Reading empty packet at end of read"); - - packetReceiver.receiveNextPacket(in); - - PacketHeader trailer = packetReceiver.getHeader(); - if (!trailer.isLastPacketInBlock() || - trailer.getDataLen() != 0) { - throw new IOException("Expected empty end-of-read packet! Header: " + - trailer); - } - } - - protected BlockReaderRemote2(String file, long blockId, - DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache, Tracer tracer, - int networkDistance) { - // Path is used only for printing block and file information in debug - this.peer = peer; - this.datanodeID = datanodeID; - this.in = peer.getInputStreamChannel(); - this.checksum = checksum; - this.verifyChecksum = verifyChecksum; - this.startOffset = Math.max( startOffset, 0 ); - this.filename = file; - this.peerCache = peerCache; - this.blockId = blockId; - - // The total number of bytes that we need to transfer from the DN is - // the amount that the user wants (bytesToRead), plus the padding at - // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read starts/ends mid-chunk. - this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); - bytesPerChecksum = this.checksum.getBytesPerChecksum(); - checksumSize = this.checksum.getChecksumSize(); - this.tracer = tracer; - this.networkDistance = networkDistance; - } - - - @Override - public synchronized void close() throws IOException { - packetReceiver.close(); - startOffset = -1; - checksum = null; - if (peerCache != null && sentStatusCode) { - peerCache.put(datanodeID, peer); - } else { - peer.close(); - } - - // in will be closed when its Socket is closed. - } - - /** - * When the reader reaches end of the read, it sends a status response - * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN - * closing our connection (which we will re-open), but won't affect - * data correctness. - */ - void sendReadResult(Status statusCode) { - assert !sentStatusCode : "already sent status code to " + peer; - try { - writeReadResult(peer.getOutputStream(), statusCode); - sentStatusCode = true; - } catch (IOException e) { - // It's ok not to be able to send this. But something is probably wrong. - LOG.info("Could not send read status (" + statusCode + ") to datanode " + - peer.getRemoteAddressString() + ": " + e.getMessage()); - } - } - - /** - * Serialize the actual read result on the wire. - */ - static void writeReadResult(OutputStream out, Status statusCode) - throws IOException { - - ClientReadStatusProto.newBuilder() - .setStatus(statusCode) - .build() - .writeDelimitedTo(out); - - out.flush(); - } - - /** - * File name to print when accessing a block directly (from servlets) - * @param s Address of the block location - * @param poolId Block pool ID of the block - * @param blockId Block ID of the block - * @return string that has a file name for debug purposes - */ - public static String getFileName(final InetSocketAddress s, - final String poolId, final long blockId) { - return s.toString() + ":" + poolId + ":" + blockId; - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return BlockReaderUtil.readAll(this, buf, offset, len); - } - - @Override - public void readFully(byte[] buf, int off, int len) throws IOException { - BlockReaderUtil.readFully(this, buf, off, len); - } - - /** - * Create a new BlockReader specifically to satisfy a read. - * This method also sends the OP_READ_BLOCK request. - * - * @param file File location - * @param block The block object - * @param blockToken The block token for security - * @param startOffset The read offset, relative to block head - * @param len The number of bytes to read - * @param verifyChecksum Whether to verify checksum - * @param clientName Client name - * @param peer The Peer to use - * @param datanodeID The DatanodeID this peer is connected to - * @return New BlockReader instance, or null on error. - */ - public static BlockReader newBlockReader(String file, - ExtendedBlock block, - Token<BlockTokenIdentifier> blockToken, - long startOffset, long len, - boolean verifyChecksum, - String clientName, - Peer peer, DatanodeID datanodeID, - PeerCache peerCache, - CachingStrategy cachingStrategy, - Tracer tracer, - int networkDistance) throws IOException { - // in and out will be closed when sock is closed (by the caller) - final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - peer.getOutputStream())); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, - verifyChecksum, cachingStrategy); - - // - // Get bytes in block - // - DataInputStream in = new DataInputStream(peer.getInputStream()); - - BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); - checkSuccess(status, peer, block, file); - ReadOpChecksumInfoProto checksumInfo = - status.getReadOpChecksumInfo(); - DataChecksum checksum = DataTransferProtoUtil.fromProto( - checksumInfo.getChecksum()); - //Warning when we get CHECKSUM_NULL? - - // Read the first chunk offset. - long firstChunkOffset = checksumInfo.getChunkOffset(); - - if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || - firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { - throw new IOException("BlockReader: error in first chunk offset (" + - firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); - } - - return new BlockReaderRemote2(file, block.getBlockId(), checksum, - verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, - peerCache, tracer, networkDistance); - } - - static void checkSuccess( - BlockOpResponseProto status, Peer peer, - ExtendedBlock block, String file) - throws IOException { - String logInfo = "for OP_READ_BLOCK" - + ", self=" + peer.getLocalAddressString() - + ", remote=" + peer.getRemoteAddressString() - + ", for file " + file - + ", for pool " + block.getBlockPoolId() - + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); - DataTransferProtoUtil.checkBlockOpStatus(status, logInfo); - } - - @Override - public int available() { - // An optimistic estimate of how much data is available - // to us without doing network I/O. - return TCP_WINDOW_SIZE; - } - - @Override - public boolean isShortCircuit() { - return false; - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - return null; - } - - @Override - public DataChecksum getDataChecksum() { - return checksum; - } - - @Override - public int getNetworkDistance() { - return networkDistance; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b281bce/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 883a5bb..ded989c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -63,10 +63,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSF import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; @@ -582,7 +580,6 @@ public class DfsClientConf { private final int socketCacheCapacity; private final long socketCacheExpiry; - private final boolean useLegacyBlockReader; private final boolean useLegacyBlockReaderLocal; private final String domainSocketPath; private final boolean skipShortCircuitChecksums; @@ -610,9 +607,6 @@ public class DfsClientConf { DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT); - useLegacyBlockReader = conf.getBoolean( - DFS_CLIENT_USE_LEGACY_BLOCKREADER, - DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); useLegacyBlockReaderLocal = conf.getBoolean( DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT); @@ -700,12 +694,6 @@ public class DfsClientConf { public boolean isDomainSocketDataTraffic() { return domainSocketDataTraffic; } - /** - * @return the useLegacyBlockReader - */ - public boolean isUseLegacyBlockReader() { - return useLegacyBlockReader; - } /** * @return the skipShortCircuitChecksums http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b281bce/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java index 0379946..140c658 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; -import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote2; +import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -112,7 +112,7 @@ class StripedBlockReader { * * TODO: add proper tracer */ - return BlockReaderRemote2.newBlockReader( + return BlockReaderRemote.newBlockReader( "dummy", block, blockToken, offsetInBlock, block.getNumBytes() - offsetInBlock, true, "", newConnectedPeer(block, dnAddr, blockToken, source), source, http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b281bce/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java deleted file mode 100644 index 70ed428..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.client.impl; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.Random; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.BlockReader; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -abstract public class TestBlockReaderBase { - private BlockReaderTestUtil util; - private byte[] blockData; - private BlockReader reader; - - /** - * if override this, make sure return array length is less than - * block size. - */ - byte [] getBlockData() { - int length = 1 << 22; - byte[] data = new byte[length]; - for (int i = 0; i < length; i++) { - data[i] = (byte) (i % 133); - } - return data; - } - - private BlockReader getBlockReader(LocatedBlock block) throws Exception { - return util.getBlockReader(block, 0, blockData.length); - } - - abstract HdfsConfiguration createConf(); - - @Before - public void setup() throws Exception { - util = new BlockReaderTestUtil(1, createConf()); - blockData = getBlockData(); - DistributedFileSystem fs = util.getCluster().getFileSystem(); - Path testfile = new Path("/testfile"); - FSDataOutputStream fout = fs.create(testfile); - fout.write(blockData); - fout.close(); - LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0); - reader = getBlockReader(blk); - } - - @After - public void shutdown() throws Exception { - util.shutdown(); - } - - @Test(timeout=60000) - public void testSkip() throws IOException { - Random random = new Random(); - byte [] buf = new byte[1]; - for (int pos = 0; pos < blockData.length;) { - long skip = random.nextInt(100) + 1; - long skipped = reader.skip(skip); - if (pos + skip >= blockData.length) { - assertEquals(blockData.length, pos + skipped); - break; - } else { - assertEquals(skip, skipped); - pos += skipped; - assertEquals(1, reader.read(buf, 0, 1)); - - assertEquals(blockData[pos], buf[0]); - pos += 1; - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b281bce/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java index c30aac8..5638720 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java @@ -17,14 +17,82 @@ */ package org.apache.hadoop.hdfs.client.impl; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * This tests BlockReaderRemote. + */ +public class TestBlockReaderRemote { + private BlockReaderTestUtil util; + private byte[] blockData; + private BlockReader reader; + + /** + * if override this, make sure return array length is less than + * block size. + */ + byte [] getBlockData() { + int length = 1 << 22; + byte[] data = new byte[length]; + for (int i = 0; i < length; i++) { + data[i] = (byte) (i % 133); + } + return data; + } + + private BlockReader getBlockReader(LocatedBlock block) throws Exception { + return util.getBlockReader(block, 0, blockData.length); + } + + @Before + public void setup() throws Exception { + util = new BlockReaderTestUtil(1, new HdfsConfiguration()); + blockData = getBlockData(); + DistributedFileSystem fs = util.getCluster().getFileSystem(); + Path testfile = new Path("/testfile"); + FSDataOutputStream fout = fs.create(testfile); + fout.write(blockData); + fout.close(); + LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0); + reader = getBlockReader(blk); + } + + @After + public void shutdown() throws Exception { + util.shutdown(); + } -public class TestBlockReaderRemote extends TestBlockReaderBase { + @Test(timeout=60000) + public void testSkip() throws IOException { + Random random = new Random(); + byte [] buf = new byte[1]; + for (int pos = 0; pos < blockData.length;) { + long skip = random.nextInt(100) + 1; + long skipped = reader.skip(skip); + if (pos + skip >= blockData.length) { + assertEquals(blockData.length, pos + skipped); + break; + } else { + assertEquals(skip, skipped); + pos += skipped; + assertEquals(1, reader.read(buf, 0, 1)); - HdfsConfiguration createConf() { - HdfsConfiguration conf = new HdfsConfiguration(); - conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true); - return conf; + assertEquals(blockData[pos], buf[0]); + pos += 1; + } + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b281bce/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java deleted file mode 100644 index 34a1539..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.client.impl; - -import org.apache.hadoop.hdfs.HdfsConfiguration; - -public class TestBlockReaderRemote2 extends TestBlockReaderBase { - HdfsConfiguration createConf() { - HdfsConfiguration conf = new HdfsConfiguration(); - return conf; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b281bce/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java index b6ec1bf..fc2de94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java @@ -42,7 +42,7 @@ public class TestClientBlockVerification { static LocatedBlock testBlock = null; static { - GenericTestUtils.setLogLevel(BlockReaderRemote2.LOG, Level.ALL); + GenericTestUtils.setLogLevel(BlockReaderRemote.LOG, Level.ALL); } @BeforeClass public static void setupCluster() throws Exception { @@ -58,7 +58,7 @@ public class TestClientBlockVerification { */ @Test public void testBlockVerification() throws Exception { - BlockReaderRemote2 reader = (BlockReaderRemote2)spy( + BlockReaderRemote reader = (BlockReaderRemote)spy( util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); verify(reader).sendReadResult(Status.CHECKSUM_OK); @@ -70,7 +70,7 @@ public class TestClientBlockVerification { */ @Test public void testIncompleteRead() throws Exception { - BlockReaderRemote2 reader = (BlockReaderRemote2)spy( + BlockReaderRemote reader = (BlockReaderRemote)spy( util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false); @@ -88,7 +88,7 @@ public class TestClientBlockVerification { @Test public void testCompletePartialRead() throws Exception { // Ask for half the file - BlockReaderRemote2 reader = (BlockReaderRemote2)spy( + BlockReaderRemote reader = (BlockReaderRemote)spy( util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2)); // And read half the file util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); @@ -108,7 +108,7 @@ public class TestClientBlockVerification { for (int length : lengths) { DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " + " len=" + length); - BlockReaderRemote2 reader = (BlockReaderRemote2)spy( + BlockReaderRemote reader = (BlockReaderRemote)spy( util.getBlockReader(testBlock, startOffset, length)); util.readAndCheckEOS(reader, length, true); verify(reader).sendReadResult(Status.CHECKSUM_OK); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b281bce/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java index 067b8ce..a7132b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java @@ -586,9 +586,9 @@ public class TestShortCircuitLocalRead { } @Test(timeout=60000) - public void testReadWithRemoteBlockReader() + public void testReadWithRemoteBlockReader2() throws IOException, InterruptedException { - doTestShortCircuitReadWithRemoteBlockReader(true, 3 * blockSize + 100, + doTestShortCircuitReadWithRemoteBlockReader2(3 * blockSize + 100, getCurrentUser(), 0, false); } @@ -597,13 +597,11 @@ public class TestShortCircuitLocalRead { * through BlockReaderRemote * @throws IOException */ - public void doTestShortCircuitReadWithRemoteBlockReader( - boolean ignoreChecksum, int size, String shortCircuitUser, + public void doTestShortCircuitReadWithRemoteBlockReader2( + int size, String shortCircuitUser, int readOffset, boolean shortCircuitFails) throws IOException, InterruptedException { Configuration conf = new Configuration(); - conf.setBoolean( - HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) @@ -624,9 +622,9 @@ public class TestShortCircuitLocalRead { try { checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, conf, shortCircuitFails); - //BlockReaderRemote have unsupported method read(ByteBuffer bf) - assertTrue( - "BlockReaderRemote unsupported method read(ByteBuffer bf) error", + //BlockReaderRemote2 have unsupported method read(ByteBuffer bf) + assertFalse( + "BlockReaderRemote2 unsupported method read(ByteBuffer bf) error", checkUnsupportedMethod(fs, file1, fileData, readOffset)); } catch(IOException e) { throw new IOException( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
