http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java new file mode 100644 index 0000000..22d4e23 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java @@ -0,0 +1,512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FSInputChecker; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.PeerCache; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @deprecated this is an old implementation that is being left around + * in case any issues spring up with the new {@link BlockReaderRemote2} + * implementation. + * It will be removed in the next release. + */ [email protected] +@Deprecated +public class BlockReaderRemote extends FSInputChecker implements BlockReader { + static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class); + + private final Peer peer; + private final DatanodeID datanodeID; + private final DataInputStream in; + private DataChecksum checksum; + + /** offset in block of the last chunk received */ + private long lastChunkOffset = -1; + private long lastChunkLen = -1; + private long lastSeqNo = -1; + + /** offset in block where reader wants to actually read */ + private long startOffset; + + private final long blockId; + + /** offset in block of of first chunk - may be less than startOffset + if startOffset is not chunk-aligned */ + private final long firstChunkOffset; + + private final int bytesPerChecksum; + private final int checksumSize; + + /** + * The total number of bytes we need to transfer from the DN. + * This is the amount that the user has requested plus some padding + * at the beginning so that the read can begin on a chunk boundary. + */ + private final long bytesNeededToFinish; + + private boolean eos = false; + private boolean sentStatusCode = false; + + ByteBuffer checksumBytes = null; + /** Amount of unread data in the current received packet */ + int dataLeft = 0; + + private final PeerCache peerCache; + + private final Tracer tracer; + + private final int networkDistance; + + /* FSInputChecker interface */ + + /* same interface as inputStream java.io.InputStream#read() + * used by DFSInputStream#read() + * This violates one rule when there is a checksum error: + * "Read should not modify user buffer before successful read" + * because it first reads the data to user buffer and then checks + * the checksum. + */ + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + + // This has to be set here, *before* the skip, since we can + // hit EOS during the skip, in the case that our entire read + // is smaller than the checksum chunk. + boolean eosBefore = eos; + + //for the first read, skip the extra bytes at the front. + if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { + // Skip these bytes. But don't call this.skip()! + int toSkip = (int)(startOffset - firstChunkOffset); + if ( super.readAndDiscard(toSkip) != toSkip ) { + // should never happen + throw new IOException("Could not skip required number of bytes"); + } + } + + int nRead = super.read(buf, off, len); + + // if eos was set in the previous read, send a status code to the DN + if (eos && !eosBefore && nRead >= 0) { + if (needChecksum()) { + sendReadResult(peer, Status.CHECKSUM_OK); + } else { + sendReadResult(peer, Status.SUCCESS); + } + } + return nRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + long nSkipped = 0; + while (nSkipped < n) { + int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE); + int ret = readAndDiscard(toSkip); + if (ret <= 0) { + return nSkipped; + } + nSkipped += ret; + } + return nSkipped; + } + + @Override + public int read() throws IOException { + throw new IOException("read() is not expected to be invoked. " + + "Use read(buf, off, len) instead."); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + /* Checksum errors are handled outside the BlockReader. + * DFSInputStream does not always call 'seekToNewSource'. In the + * case of pread(), it just tries a different replica without seeking. + */ + return false; + } + + @Override + public void seek(long pos) throws IOException { + throw new IOException("Seek() is not supported in BlockInputChecker"); + } + + @Override + protected long getChunkPosition(long pos) { + throw new RuntimeException("getChunkPosition() is not supported, " + + "since seek is not required"); + } + + /** + * Makes sure that checksumBytes has enough capacity + * and limit is set to the number of checksum bytes needed + * to be read. + */ + private void adjustChecksumBytes(int dataLen) { + int requiredSize = + ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; + if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { + checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); + } else { + checksumBytes.clear(); + } + checksumBytes.limit(requiredSize); + } + + @Override + protected synchronized int readChunk(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { + try (TraceScope ignored = tracer.newScope( + "BlockReaderRemote#readChunk(" + blockId + ")")) { + return readChunkImpl(pos, buf, offset, len, checksumBuf); + } + } + + private synchronized int readChunkImpl(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { + // Read one chunk. + if (eos) { + // Already hit EOF + return -1; + } + + // Read one DATA_CHUNK. + long chunkOffset = lastChunkOffset; + if ( lastChunkLen > 0 ) { + chunkOffset += lastChunkLen; + } + + // pos is relative to the start of the first chunk of the read. + // chunkOffset is relative to the start of the block. + // This makes sure that the read passed from FSInputChecker is the + // for the same chunk we expect to be reading from the DN. + if ( (pos + firstChunkOffset) != chunkOffset ) { + throw new IOException("Mismatch in pos : " + pos + " + " + + firstChunkOffset + " != " + chunkOffset); + } + + // Read next packet if the previous packet has been read completely. + if (dataLeft <= 0) { + //Read packet headers. + PacketHeader header = new PacketHeader(); + header.readFields(in); + + LOG.debug("DFSClient readChunk got header {}", header); + + // Sanity check the lengths + if (!header.sanityCheck(lastSeqNo)) { + throw new IOException("BlockReader: error in packet header " + + header); + } + + lastSeqNo = header.getSeqno(); + dataLeft = header.getDataLen(); + adjustChecksumBytes(header.getDataLen()); + if (header.getDataLen() > 0) { + IOUtils.readFully(in, checksumBytes.array(), 0, + checksumBytes.limit()); + } + } + + // Sanity checks + assert len >= bytesPerChecksum; + assert checksum != null; + assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0); + + + int checksumsToRead, bytesToRead; + + if (checksumSize > 0) { + + // How many chunks left in our packet - this is a ceiling + // since we may have a partial chunk at the end of the file + int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; + + // How many chunks we can fit in databuffer + // - note this is a floor since we always read full chunks + int chunksCanFit = Math.min(len / bytesPerChecksum, + checksumBuf.length / checksumSize); + + // How many chunks should we read + checksumsToRead = Math.min(chunksLeft, chunksCanFit); + // How many bytes should we actually read + bytesToRead = Math.min( + checksumsToRead * bytesPerChecksum, // full chunks + dataLeft); // in case we have a partial + } else { + // no checksum + bytesToRead = Math.min(dataLeft, len); + checksumsToRead = 0; + } + + if ( bytesToRead > 0 ) { + // Assert we have enough space + assert bytesToRead <= len; + assert checksumBytes.remaining() >= checksumSize * checksumsToRead; + assert checksumBuf.length >= checksumSize * checksumsToRead; + IOUtils.readFully(in, buf, offset, bytesToRead); + checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead); + } + + dataLeft -= bytesToRead; + assert dataLeft >= 0; + + lastChunkOffset = chunkOffset; + lastChunkLen = bytesToRead; + + // If there's no data left in the current packet after satisfying + // this read, and we have satisfied the client read, we expect + // an empty packet header from the DN to signify this. + // Note that pos + bytesToRead may in fact be greater since the + // DN finishes off the entire last chunk. + if (dataLeft == 0 && + pos + bytesToRead >= bytesNeededToFinish) { + + // Read header + PacketHeader hdr = new PacketHeader(); + hdr.readFields(in); + + if (!hdr.isLastPacketInBlock() || + hdr.getDataLen() != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + hdr); + } + + eos = true; + } + + if ( bytesToRead == 0 ) { + return -1; + } + + return bytesToRead; + } + + private BlockReaderRemote(String file, String bpid, long blockId, + DataInputStream in, DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, + DatanodeID datanodeID, PeerCache peerCache, Tracer tracer, + int networkDistance) { + // Path is used only for printing block and file information in debug + super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + + ":" + bpid + ":of:"+ file)/*too non path-like?*/, + 1, verifyChecksum, + checksum.getChecksumSize() > 0? checksum : null, + checksum.getBytesPerChecksum(), + checksum.getChecksumSize()); + + this.peer = peer; + this.datanodeID = datanodeID; + this.in = in; + this.checksum = checksum; + this.startOffset = Math.max( startOffset, 0 ); + this.blockId = blockId; + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + + this.firstChunkOffset = firstChunkOffset; + lastChunkOffset = firstChunkOffset; + lastChunkLen = -1; + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + this.peerCache = peerCache; + this.tracer = tracer; + this.networkDistance = networkDistance; + } + + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param bufferSize The IO buffer size (not the client buffer size) + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @return New BlockReader instance, or null on error. + */ + public static BlockReaderRemote newBlockReader(String file, + ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken, + long startOffset, long len, + int bufferSize, boolean verifyChecksum, + String clientName, Peer peer, + DatanodeID datanodeID, + PeerCache peerCache, + CachingStrategy cachingStrategy, + Tracer tracer, int networkDistance) + throws IOException { + // in and out will be closed when sock is closed (by the caller) + final DataOutputStream out = + new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, + verifyChecksum, cachingStrategy); + + // + // Get bytes in block, set streams + // + + DataInputStream in = new DataInputStream( + new BufferedInputStream(peer.getInputStream(), bufferSize)); + + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + BlockReaderRemote2.checkSuccess(status, peer, block, file); + ReadOpChecksumInfoProto checksumInfo = + status.getReadOpChecksumInfo(); + DataChecksum checksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); + //Warning when we get CHECKSUM_NULL? + + // Read the first chunk offset. + long firstChunkOffset = checksumInfo.getChunkOffset(); + + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || + firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { + throw new IOException("BlockReader: error in first chunk offset (" + + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); + } + + return new BlockReaderRemote(file, block.getBlockPoolId(), block.getBlockId(), + in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, + peer, datanodeID, peerCache, tracer, networkDistance); + } + + @Override + public synchronized void close() throws IOException { + startOffset = -1; + checksum = null; + if (peerCache != null & sentStatusCode) { + peerCache.put(datanodeID, peer); + } else { + peer.close(); + } + + // in will be closed when its Socket is closed. + } + + @Override + public void readFully(byte[] buf, int readOffset, int amtToRead) + throws IOException { + IOUtils.readFully(this, buf, readOffset, amtToRead); + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return readFully(this, buf, offset, len); + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Peer peer, Status statusCode) { + assert !sentStatusCode : "already sent status code to " + peer; + try { + BlockReaderRemote2.writeReadResult(peer.getOutputStream(), statusCode); + sentStatusCode = true; + } catch (IOException e) { + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + peer.getRemoteAddressString() + ": " + e.getMessage()); + } + } + + @Override + public int read(ByteBuffer buf) throws IOException { + throw new UnsupportedOperationException("readDirect unsupported in BlockReaderRemote"); + } + + @Override + public int available() { + // An optimistic estimate of how much data is available + // to us without doing network I/O. + return BlockReaderRemote2.TCP_WINDOW_SIZE; + } + + @Override + public boolean isShortCircuit() { + return false; + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + return null; + } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } + + @Override + public int getNetworkDistance() { + return networkDistance; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java new file mode 100644 index 0000000..ebdc3fd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java @@ -0,0 +1,474 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.EnumSet; +import java.util.UUID; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.PeerCache; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.htrace.core.TraceScope; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.htrace.core.Tracer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a wrapper around connection to datanode + * and understands checksum, offset etc. + * + * Terminology: + * <dl> + * <dt>block</dt> + * <dd>The hdfs block, typically large (~64MB). + * </dd> + * <dt>chunk</dt> + * <dd>A block is divided into chunks, each comes with a checksum. + * We want transfers to be chunk-aligned, to be able to + * verify checksums. + * </dd> + * <dt>packet</dt> + * <dd>A grouping of chunks used for transport. It contains a + * header, followed by checksum data, followed by real data. + * </dd> + * </dl> + * Please see DataNode for the RPC specification. + * + * This is a new implementation introduced in Hadoop 0.23 which + * is more efficient and simpler than the older BlockReader + * implementation. It should be renamed to BlockReaderRemote + * once we are confident in it. + */ [email protected] +public class BlockReaderRemote2 implements BlockReader { + + static final Logger LOG = LoggerFactory.getLogger(BlockReaderRemote2.class); + static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB; + + final private Peer peer; + final private DatanodeID datanodeID; + final private PeerCache peerCache; + final private long blockId; + private final ReadableByteChannel in; + + private DataChecksum checksum; + private final PacketReceiver packetReceiver = new PacketReceiver(true); + + private ByteBuffer curDataSlice = null; + + /** offset in block of the last chunk received */ + private long lastSeqNo = -1; + + /** offset in block where reader wants to actually read */ + private long startOffset; + private final String filename; + + private final int bytesPerChecksum; + private final int checksumSize; + + /** + * The total number of bytes we need to transfer from the DN. + * This is the amount that the user has requested plus some padding + * at the beginning so that the read can begin on a chunk boundary. + */ + private long bytesNeededToFinish; + + private final boolean verifyChecksum; + + private boolean sentStatusCode = false; + + private final Tracer tracer; + + private final int networkDistance; + + @VisibleForTesting + public Peer getPeer() { + return peer; + } + + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null); + LOG.trace("Starting read #{} file {} from datanode {}", + randomId, filename, datanodeID.getHostName()); + + if (curDataSlice == null || + curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + try (TraceScope ignored = tracer.newScope( + "BlockReaderRemote2#readNextPacket(" + blockId + ")")) { + readNextPacket(); + } + } + + LOG.trace("Finishing read #{}", randomId); + + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; + } + + int nRead = Math.min(curDataSlice.remaining(), len); + curDataSlice.get(buf, off, nRead); + + return nRead; + } + + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + if (curDataSlice == null || + (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) { + try (TraceScope ignored = tracer.newScope( + "BlockReaderRemote2#readNextPacket(" + blockId + ")")) { + readNextPacket(); + } + } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; + } + + int nRead = Math.min(curDataSlice.remaining(), buf.remaining()); + ByteBuffer writeSlice = curDataSlice.duplicate(); + writeSlice.limit(writeSlice.position() + nRead); + buf.put(writeSlice); + curDataSlice.position(writeSlice.position()); + + return nRead; + } + + private void readNextPacket() throws IOException { + //Read packet headers. + packetReceiver.receiveNextPacket(in); + + PacketHeader curHeader = packetReceiver.getHeader(); + curDataSlice = packetReceiver.getDataSlice(); + assert curDataSlice.capacity() == curHeader.getDataLen(); + + LOG.trace("DFSClient readNextPacket got header {}", curHeader); + + // Sanity check the lengths + if (!curHeader.sanityCheck(lastSeqNo)) { + throw new IOException("BlockReader: error in packet header " + + curHeader); + } + + if (curHeader.getDataLen() > 0) { + int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; + int checksumsLen = chunks * checksumSize; + + assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : + "checksum slice capacity=" + + packetReceiver.getChecksumSlice().capacity() + + " checksumsLen=" + checksumsLen; + + lastSeqNo = curHeader.getSeqno(); + if (verifyChecksum && curDataSlice.remaining() > 0) { + // N.B.: the checksum error offset reported here is actually + // relative to the start of the block, not the start of the file. + // This is slightly misleading, but preserves the behavior from + // the older BlockReader. + checksum.verifyChunkedSums(curDataSlice, + packetReceiver.getChecksumSlice(), + filename, curHeader.getOffsetInBlock()); + } + bytesNeededToFinish -= curHeader.getDataLen(); + } + + // First packet will include some data prior to the first byte + // the user requested. Skip it. + if (curHeader.getOffsetInBlock() < startOffset) { + int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); + curDataSlice.position(newPos); + } + + // If we've now satisfied the whole client read, read one last packet + // header, which should be empty + if (bytesNeededToFinish <= 0) { + readTrailingEmptyPacket(); + if (verifyChecksum) { + sendReadResult(Status.CHECKSUM_OK); + } else { + sendReadResult(Status.SUCCESS); + } + } + } + + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + long skipped = 0; + while (skipped < n) { + long needToSkip = n - skipped; + if (curDataSlice == null || + curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + readNextPacket(); + } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + break; + } + + int skip = (int)Math.min(curDataSlice.remaining(), needToSkip); + curDataSlice.position(curDataSlice.position() + skip); + skipped += skip; + } + return skipped; + } + + private void readTrailingEmptyPacket() throws IOException { + LOG.trace("Reading empty packet at end of read"); + + packetReceiver.receiveNextPacket(in); + + PacketHeader trailer = packetReceiver.getHeader(); + if (!trailer.isLastPacketInBlock() || + trailer.getDataLen() != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + trailer); + } + } + + protected BlockReaderRemote2(String file, long blockId, + DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, + DatanodeID datanodeID, PeerCache peerCache, Tracer tracer, + int networkDistance) { + // Path is used only for printing block and file information in debug + this.peer = peer; + this.datanodeID = datanodeID; + this.in = peer.getInputStreamChannel(); + this.checksum = checksum; + this.verifyChecksum = verifyChecksum; + this.startOffset = Math.max( startOffset, 0 ); + this.filename = file; + this.peerCache = peerCache; + this.blockId = blockId; + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + this.tracer = tracer; + this.networkDistance = networkDistance; + } + + + @Override + public synchronized void close() throws IOException { + packetReceiver.close(); + startOffset = -1; + checksum = null; + if (peerCache != null && sentStatusCode) { + peerCache.put(datanodeID, peer); + } else { + peer.close(); + } + + // in will be closed when its Socket is closed. + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Status statusCode) { + assert !sentStatusCode : "already sent status code to " + peer; + try { + writeReadResult(peer.getOutputStream(), statusCode); + sentStatusCode = true; + } catch (IOException e) { + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + peer.getRemoteAddressString() + ": " + e.getMessage()); + } + } + + /** + * Serialize the actual read result on the wire. + */ + static void writeReadResult(OutputStream out, Status statusCode) + throws IOException { + + ClientReadStatusProto.newBuilder() + .setStatus(statusCode) + .build() + .writeDelimitedTo(out); + + out.flush(); + } + + /** + * File name to print when accessing a block directly (from servlets) + * @param s Address of the block location + * @param poolId Block pool ID of the block + * @param blockId Block ID of the block + * @return string that has a file name for debug purposes + */ + public static String getFileName(final InetSocketAddress s, + final String poolId, final long blockId) { + return s.toString() + ":" + poolId + ":" + blockId; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); + } + + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @param peer The Peer to use + * @param datanodeID The DatanodeID this peer is connected to + * @return New BlockReader instance, or null on error. + */ + public static BlockReader newBlockReader(String file, + ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken, + long startOffset, long len, + boolean verifyChecksum, + String clientName, + Peer peer, DatanodeID datanodeID, + PeerCache peerCache, + CachingStrategy cachingStrategy, + Tracer tracer, + int networkDistance) throws IOException { + // in and out will be closed when sock is closed (by the caller) + final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( + peer.getOutputStream())); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, + verifyChecksum, cachingStrategy); + + // + // Get bytes in block + // + DataInputStream in = new DataInputStream(peer.getInputStream()); + + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + checkSuccess(status, peer, block, file); + ReadOpChecksumInfoProto checksumInfo = + status.getReadOpChecksumInfo(); + DataChecksum checksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); + //Warning when we get CHECKSUM_NULL? + + // Read the first chunk offset. + long firstChunkOffset = checksumInfo.getChunkOffset(); + + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || + firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { + throw new IOException("BlockReader: error in first chunk offset (" + + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); + } + + return new BlockReaderRemote2(file, block.getBlockId(), checksum, + verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, + peerCache, tracer, networkDistance); + } + + static void checkSuccess( + BlockOpResponseProto status, Peer peer, + ExtendedBlock block, String file) + throws IOException { + String logInfo = "for OP_READ_BLOCK" + + ", self=" + peer.getLocalAddressString() + + ", remote=" + peer.getRemoteAddressString() + + ", for file " + file + + ", for pool " + block.getBlockPoolId() + + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); + DataTransferProtoUtil.checkBlockOpStatus(status, logInfo); + } + + @Override + public int available() { + // An optimistic estimate of how much data is available + // to us without doing network I/O. + return TCP_WINDOW_SIZE; + } + + @Override + public boolean isShortCircuit() { + return false; + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + return null; + } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } + + @Override + public int getNetworkDistance() { + return networkDistance; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java new file mode 100644 index 0000000..57fbe47 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.BlockReader; + +import java.io.IOException; + +/** + * For sharing between the local and remote block reader implementations. + */ [email protected] +class BlockReaderUtil { + + /* See {@link BlockReader#readAll(byte[], int, int)} */ + public static int readAll(BlockReader reader, + byte[] buf, int offset, int len) throws IOException { + int n = 0; + for (;;) { + int nread = reader.read(buf, offset + n, len - n); + if (nread <= 0) + return (n == 0) ? nread : n; + n += nread; + if (n >= len) + return n; + } + } + + /* See {@link BlockReader#readFully(byte[], int, int)} */ + public static void readFully(BlockReader reader, + byte[] buf, int off, int len) throws IOException { + int toRead = len; + while (toRead > 0) { + int ret = reader.read(buf, off, toRead); + if (ret < 0) { + throw new IOException("Premature EOF from inputStream"); + } + toRead -= ret; + off += ret; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java new file mode 100644 index 0000000..3d6d9a4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.ReplicaAccessor; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.util.DataChecksum; + +/** + * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from + * replicas. + */ [email protected] +public final class ExternalBlockReader implements BlockReader { + private final ReplicaAccessor accessor; + private final long visibleLength; + private long pos; + + ExternalBlockReader(ReplicaAccessor accessor, long visibleLength, + long startOffset) { + this.accessor = accessor; + this.visibleLength = visibleLength; + this.pos = startOffset; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + int nread = accessor.read(pos, buf, off, len); + if (nread < 0) { + return nread; + } + pos += nread; + return nread; + } + + @Override + public int read(ByteBuffer buf) throws IOException { + int nread = accessor.read(pos, buf); + if (nread < 0) { + return nread; + } + pos += nread; + return nread; + } + + @Override + public long skip(long n) throws IOException { + // You cannot skip backwards + if (n <= 0) { + return 0; + } + // You can't skip past the last offset that we want to read with this + // block reader. + long oldPos = pos; + pos += n; + if (pos > visibleLength) { + pos = visibleLength; + } + return pos - oldPos; + } + + @Override + public int available() { + // We return the amount of bytes between the current offset and the visible + // length. Some of the other block readers return a shorter length than + // that. The only advantage to returning a shorter length is that the + // DFSInputStream will trash your block reader and create a new one if + // someone tries to seek() beyond the available() region. + long diff = visibleLength - pos; + if (diff > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int)diff; + } + } + + @Override + public void close() throws IOException { + accessor.close(); + } + + @Override + public void readFully(byte[] buf, int offset, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, offset, len); + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public boolean isShortCircuit() { + return accessor.isShortCircuit(); + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + // For now, pluggable ReplicaAccessors do not support zero-copy. + return null; + } + + @Override + public DataChecksum getDataChecksum() { + return null; + } + + @Override + public int getNetworkDistance() { + return accessor.getNetworkDistance(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index 3ae8b59..426fb72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -133,7 +133,7 @@ <!-- Don't complain about LocalDatanodeInfo's anonymous class --> <Match> - <Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" /> + <Class name="org.apache.hadoop.hdfs.client.impl.BlockReaderLocal$LocalDatanodeInfo$1" /> <Bug pattern="SE_BAD_FIELD_INNER_CLASS" /> </Match> <!-- Only one method increments numFailedVolumes and it is synchronized --> http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java index 7f71bf7..0379946 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; -import org.apache.hadoop.hdfs.RemoteBlockReader2; +import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote2; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -112,7 +112,7 @@ class StripedBlockReader { * * TODO: add proper tracer */ - return RemoteBlockReader2.newBlockReader( + return BlockReaderRemote2.newBlockReader( "dummy", block, blockToken, offsetInBlock, block.getNumBytes() - offsetInBlock, true, "", newConnectedPeer(block, dnAddr, blockToken, source), source, http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 80f510c..2451df3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -43,7 +43,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.hdfs.BlockReader; -import org.apache.hadoop.hdfs.BlockReaderFactory; +import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java index a1af1fc..32a34e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -38,7 +38,7 @@ import org.apache.commons.lang.SystemUtils; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.BlockReaderTestUtil; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java deleted file mode 100644 index 1ca1ca5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.List; -import java.util.Random; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsTracer; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache; -import org.apache.hadoop.hdfs.server.namenode.CacheManager; -import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; - -/** - * A helper class to setup the cluster, and get to BlockReader and DataNode for a block. - */ -public class BlockReaderTestUtil { - /** - * Returns true if we should run tests that generate large files (> 1GB) - */ - static public boolean shouldTestLargeFiles() { - String property = System.getProperty("hdfs.test.large.files"); - if (property == null) return false; - if (property.isEmpty()) return true; - return Boolean.parseBoolean(property); - } - - private HdfsConfiguration conf = null; - private MiniDFSCluster cluster = null; - - /** - * Setup the cluster - */ - public BlockReaderTestUtil(int replicationFactor) throws Exception { - this(replicationFactor, new HdfsConfiguration()); - } - - public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) throws Exception { - this.conf = config; - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor); - cluster = new MiniDFSCluster.Builder(conf).format(true).build(); - cluster.waitActive(); - } - - /** - * Shutdown cluster - */ - public void shutdown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - public MiniDFSCluster getCluster() { - return cluster; - } - - public HdfsConfiguration getConf() { - return conf; - } - - /** - * Create a file of the given size filled with random data. - * @return File data. - */ - public byte[] writeFile(Path filepath, int sizeKB) - throws IOException { - FileSystem fs = cluster.getFileSystem(); - - // Write a file with the specified amount of data - DataOutputStream os = fs.create(filepath); - byte data[] = new byte[1024 * sizeKB]; - new Random().nextBytes(data); - os.write(data); - os.close(); - return data; - } - - /** - * Get the list of Blocks for a file. - */ - public List<LocatedBlock> getFileBlocks(Path filepath, int sizeKB) - throws IOException { - // Return the blocks we just wrote - DFSClient dfsclient = getDFSClient(); - return dfsclient.getNamenode().getBlockLocations( - filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks(); - } - - /** - * Get the DFSClient. - */ - public DFSClient getDFSClient() throws IOException { - InetSocketAddress nnAddr = new InetSocketAddress("localhost", cluster.getNameNodePort()); - return new DFSClient(nnAddr, conf); - } - - /** - * Exercise the BlockReader and read length bytes. - * - * It does not verify the bytes read. - */ - public void readAndCheckEOS(BlockReader reader, int length, boolean expectEof) - throws IOException { - byte buf[] = new byte[1024]; - int nRead = 0; - while (nRead < length) { - DFSClient.LOG.info("So far read " + nRead + " - going to read more."); - int n = reader.read(buf, 0, buf.length); - assertTrue(n > 0); - nRead += n; - } - - if (expectEof) { - DFSClient.LOG.info("Done reading, expect EOF for next read."); - assertEquals(-1, reader.read(buf, 0, buf.length)); - } - } - - /** - * Get a BlockReader for the given block. - */ - public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead) - throws IOException { - return getBlockReader(cluster.getFileSystem(), testBlock, offset, lenToRead); - } - - /** - * Get a BlockReader for the given block. - */ - public static BlockReader getBlockReader(final DistributedFileSystem fs, - LocatedBlock testBlock, int offset, long lenToRead) throws IOException { - InetSocketAddress targetAddr = null; - ExtendedBlock block = testBlock.getBlock(); - DatanodeInfo[] nodes = testBlock.getLocations(); - targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); - - return new BlockReaderFactory(fs.getClient().getConf()). - setInetSocketAddress(targetAddr). - setBlock(block). - setFileName(targetAddr.toString()+ ":" + block.getBlockId()). - setBlockToken(testBlock.getBlockToken()). - setStartOffset(offset). - setLength(lenToRead). - setVerifyChecksum(true). - setClientName("BlockReaderTestUtil"). - setDatanodeInfo(nodes[0]). - setClientCacheContext(ClientContext.getFromConf(fs.getConf())). - setCachingStrategy(CachingStrategy.newDefaultStrategy()). - setConfiguration(fs.getConf()). - setAllowShortCircuitLocalReads(true). - setTracer(FsTracer.get(fs.getConf())). - setRemotePeerFactory(new RemotePeerFactory() { - @Override - public Peer newConnectedPeer(InetSocketAddress addr, - Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) - throws IOException { - Peer peer = null; - Socket sock = NetUtils. - getDefaultSocketFactory(fs.getConf()).createSocket(); - try { - sock.connect(addr, HdfsConstants.READ_TIMEOUT); - sock.setSoTimeout(HdfsConstants.READ_TIMEOUT); - peer = DFSUtilClient.peerFromSocket(sock); - } finally { - if (peer == null) { - IOUtils.closeQuietly(sock); - } - } - return peer; - } - }). - build(); - } - - /** - * Get a DataNode that serves our testBlock. - */ - public DataNode getDataNode(LocatedBlock testBlock) { - DatanodeInfo[] nodes = testBlock.getLocations(); - int ipcport = nodes[0].getIpcPort(); - return cluster.getDataNode(ipcport); - } - - public static void enableHdfsCachingTracing() { - LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(CacheManager.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(FsDatasetCache.class.getName()).setLevel( - Level.TRACE); - } - - public static void enableBlockReaderFactoryTracing() { - LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel( - Level.TRACE); - } - - public static void enableShortCircuitShmTracing() { - LogManager.getLogger(DfsClientShmManager.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(ShortCircuitRegistry.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(ShortCircuitShm.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(DataNode.class.getName()).setLevel( - Level.TRACE); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index ccc94a4..3c58133 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java deleted file mode 100644 index 3d916a7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.Random; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -abstract public class TestBlockReaderBase { - private BlockReaderTestUtil util; - private byte[] blockData; - private BlockReader reader; - - /** - * if override this, make sure return array length is less than - * block size. - */ - byte [] getBlockData() { - int length = 1 << 22; - byte[] data = new byte[length]; - for (int i = 0; i < length; i++) { - data[i] = (byte) (i % 133); - } - return data; - } - - private BlockReader getBlockReader(LocatedBlock block) throws Exception { - return util.getBlockReader(block, 0, blockData.length); - } - - abstract HdfsConfiguration createConf(); - - @Before - public void setup() throws Exception { - util = new BlockReaderTestUtil(1, createConf()); - blockData = getBlockData(); - DistributedFileSystem fs = util.getCluster().getFileSystem(); - Path testfile = new Path("/testfile"); - FSDataOutputStream fout = fs.create(testfile); - fout.write(blockData); - fout.close(); - LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0); - reader = getBlockReader(blk); - } - - @After - public void shutdown() throws Exception { - util.shutdown(); - } - - @Test(timeout=60000) - public void testSkip() throws IOException { - Random random = new Random(); - byte [] buf = new byte[1]; - for (int pos = 0; pos < blockData.length;) { - long skip = random.nextInt(100) + 1; - long skipped = reader.skip(skip); - if (pos + skip >= blockData.length) { - assertEquals(blockData.length, pos + skipped); - break; - } else { - assertEquals(skip, skipped); - pos += skipped; - assertEquals(1, reader.read(buf, 0, 1)); - - assertEquals(blockData[pos], buf[0]); - pos += 1; - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java deleted file mode 100644 index a392c6c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java +++ /dev/null @@ -1,534 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS; -import static org.hamcrest.CoreMatchers.equalTo; - -import java.io.File; -import java.io.IOException; -import java.nio.channels.ClosedByInterruptException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo; -import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.net.unix.TemporarySocketDirectory; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.util.concurrent.Uninterruptibles; - -public class TestBlockReaderFactory { - static final Log LOG = LogFactory.getLog(TestBlockReaderFactory.class); - - @Before - public void init() { - DomainSocket.disableBindPathValidation(); - Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); - } - - @After - public void cleanup() { - DFSInputStream.tcpReadsDisabledForTesting = false; - BlockReaderFactory.createShortCircuitReplicaInfoCallback = null; - } - - public static Configuration createShortCircuitConf(String testName, - TemporarySocketDirectory sockDir) { - Configuration conf = new Configuration(); - conf.set(DFS_CLIENT_CONTEXT, testName); - conf.setLong(DFS_BLOCK_SIZE_KEY, 4096); - conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), - testName + "._PORT").getAbsolutePath()); - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, - false); - conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); - return conf; - } - - /** - * If we have a UNIX domain socket configured, - * and we have dfs.client.domain.socket.data.traffic set to true, - * and short-circuit access fails, we should still be able to pass - * data traffic over the UNIX domain socket. Test this. - */ - @Test(timeout=60000) - public void testFallbackFromShortCircuitToUnixDomainTraffic() - throws Exception { - DFSInputStream.tcpReadsDisabledForTesting = true; - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - - // The server is NOT configured with short-circuit local reads; - // the client is. Both support UNIX domain reads. - Configuration clientConf = createShortCircuitConf( - "testFallbackFromShortCircuitToUnixDomainTraffic", sockDir); - clientConf.set(DFS_CLIENT_CONTEXT, - "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext"); - clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true); - Configuration serverConf = new Configuration(clientConf); - serverConf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false); - - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); - cluster.waitActive(); - FileSystem dfs = FileSystem.get(cluster.getURI(0), clientConf); - String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 8193; - final int SEED = 0xFADED; - DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE)); - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(contents, expected)); - cluster.shutdown(); - sockDir.close(); - } - - /** - * Test the case where we have multiple threads waiting on the - * ShortCircuitCache delivering a certain ShortCircuitReplica. - * - * In this case, there should only be one call to - * createShortCircuitReplicaInfo. This one replica should be shared - * by all threads. - */ - @Test(timeout=60000) - public void testMultipleWaitersOnShortCircuitCache() - throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicBoolean creationIsBlocked = new AtomicBoolean(true); - final AtomicBoolean testFailed = new AtomicBoolean(false); - DFSInputStream.tcpReadsDisabledForTesting = true; - BlockReaderFactory.createShortCircuitReplicaInfoCallback = - new ShortCircuitCache.ShortCircuitReplicaCreator() { - @Override - public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { - Uninterruptibles.awaitUninterruptibly(latch); - if (!creationIsBlocked.compareAndSet(true, false)) { - Assert.fail("there were multiple calls to " - + "createShortCircuitReplicaInfo. Only one was expected."); - } - return null; - } - }; - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration conf = createShortCircuitConf( - "testMultipleWaitersOnShortCircuitCache", sockDir); - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4000; - final int SEED = 0xFADED; - final int NUM_THREADS = 10; - DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - Runnable readerRunnable = new Runnable() { - @Override - public void run() { - try { - byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE)); - Assert.assertFalse(creationIsBlocked.get()); - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(contents, expected)); - } catch (Throwable e) { - LOG.error("readerRunnable error", e); - testFailed.set(true); - } - } - }; - Thread threads[] = new Thread[NUM_THREADS]; - for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new Thread(readerRunnable); - threads[i].start(); - } - Thread.sleep(500); - latch.countDown(); - for (int i = 0; i < NUM_THREADS; i++) { - Uninterruptibles.joinUninterruptibly(threads[i]); - } - cluster.shutdown(); - sockDir.close(); - Assert.assertFalse(testFailed.get()); - } - - /** - * Test the case where we have a failure to complete a short circuit read - * that occurs, and then later on, we have a success. - * Any thread waiting on a cache load should receive the failure (if it - * occurs); however, the failure result should not be cached. We want - * to be able to retry later and succeed. - */ - @Test(timeout=60000) - public void testShortCircuitCacheTemporaryFailure() - throws Exception { - BlockReaderTestUtil.enableBlockReaderFactoryTracing(); - final AtomicBoolean replicaCreationShouldFail = new AtomicBoolean(true); - final AtomicBoolean testFailed = new AtomicBoolean(false); - DFSInputStream.tcpReadsDisabledForTesting = true; - BlockReaderFactory.createShortCircuitReplicaInfoCallback = - new ShortCircuitCache.ShortCircuitReplicaCreator() { - @Override - public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { - if (replicaCreationShouldFail.get()) { - // Insert a short delay to increase the chance that one client - // thread waits for the other client thread's failure via - // a condition variable. - Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); - return new ShortCircuitReplicaInfo(); - } - return null; - } - }; - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration conf = createShortCircuitConf( - "testShortCircuitCacheTemporaryFailure", sockDir); - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4000; - final int NUM_THREADS = 2; - final int SEED = 0xFADED; - final CountDownLatch gotFailureLatch = new CountDownLatch(NUM_THREADS); - final CountDownLatch shouldRetryLatch = new CountDownLatch(1); - DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - Runnable readerRunnable = new Runnable() { - @Override - public void run() { - try { - // First time should fail. - List<LocatedBlock> locatedBlocks = - cluster.getNameNode().getRpcServer().getBlockLocations( - TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks(); - LocatedBlock lblock = locatedBlocks.get(0); // first block - BlockReader blockReader = null; - try { - blockReader = BlockReaderTestUtil.getBlockReader( - cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); - Assert.fail("expected getBlockReader to fail the first time."); - } catch (Throwable t) { - Assert.assertTrue("expected to see 'TCP reads were disabled " + - "for testing' in exception " + t, t.getMessage().contains( - "TCP reads were disabled for testing")); - } finally { - if (blockReader != null) blockReader.close(); // keep findbugs happy - } - gotFailureLatch.countDown(); - shouldRetryLatch.await(); - - // Second time should succeed. - try { - blockReader = BlockReaderTestUtil.getBlockReader( - cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); - } catch (Throwable t) { - LOG.error("error trying to retrieve a block reader " + - "the second time.", t); - throw t; - } finally { - if (blockReader != null) blockReader.close(); - } - } catch (Throwable t) { - LOG.error("getBlockReader failure", t); - testFailed.set(true); - } - } - }; - Thread threads[] = new Thread[NUM_THREADS]; - for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new Thread(readerRunnable); - threads[i].start(); - } - gotFailureLatch.await(); - replicaCreationShouldFail.set(false); - shouldRetryLatch.countDown(); - for (int i = 0; i < NUM_THREADS; i++) { - Uninterruptibles.joinUninterruptibly(threads[i]); - } - cluster.shutdown(); - sockDir.close(); - Assert.assertFalse(testFailed.get()); - } - - /** - * Test that a client which supports short-circuit reads using - * shared memory can fall back to not using shared memory when - * the server doesn't support it. - */ - @Test - public void testShortCircuitReadFromServerWithoutShm() throws Exception { - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration clientConf = createShortCircuitConf( - "testShortCircuitReadFromServerWithoutShm", sockDir); - Configuration serverConf = new Configuration(clientConf); - serverConf.setInt( - DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); - DFSInputStream.tcpReadsDisabledForTesting = true; - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); - cluster.waitActive(); - clientConf.set(DFS_CLIENT_CONTEXT, - "testShortCircuitReadFromServerWithoutShm_clientContext"); - final DistributedFileSystem fs = - (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4000; - final int SEED = 0xFADEC; - DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(contents, expected)); - final ShortCircuitCache cache = - fs.dfs.getClientContext().getShortCircuitCache(); - final DatanodeInfo datanode = - new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); - cache.getDfsClientShmManager().visit(new Visitor() { - @Override - public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) - throws IOException { - Assert.assertEquals(1, info.size()); - PerDatanodeVisitorInfo vinfo = info.get(datanode); - Assert.assertTrue(vinfo.disabled); - Assert.assertEquals(0, vinfo.full.size()); - Assert.assertEquals(0, vinfo.notFull.size()); - } - }); - cluster.shutdown(); - sockDir.close(); - } - - /** - * Test that a client which does not support short-circuit reads using - * shared memory can talk with a server which supports it. - */ - @Test - public void testShortCircuitReadFromClientWithoutShm() throws Exception { - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration clientConf = createShortCircuitConf( - "testShortCircuitReadWithoutShm", sockDir); - Configuration serverConf = new Configuration(clientConf); - DFSInputStream.tcpReadsDisabledForTesting = true; - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); - cluster.waitActive(); - clientConf.setInt( - DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); - clientConf.set(DFS_CLIENT_CONTEXT, - "testShortCircuitReadFromClientWithoutShm_clientContext"); - final DistributedFileSystem fs = - (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4000; - final int SEED = 0xFADEC; - DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(contents, expected)); - final ShortCircuitCache cache = - fs.dfs.getClientContext().getShortCircuitCache(); - Assert.assertEquals(null, cache.getDfsClientShmManager()); - cluster.shutdown(); - sockDir.close(); - } - - /** - * Test shutting down the ShortCircuitCache while there are things in it. - */ - @Test - public void testShortCircuitCacheShutdown() throws Exception { - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration conf = createShortCircuitConf( - "testShortCircuitCacheShutdown", sockDir); - conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown"); - Configuration serverConf = new Configuration(conf); - DFSInputStream.tcpReadsDisabledForTesting = true; - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); - cluster.waitActive(); - final DistributedFileSystem fs = - (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4000; - final int SEED = 0xFADEC; - DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(contents, expected)); - final ShortCircuitCache cache = - fs.dfs.getClientContext().getShortCircuitCache(); - cache.close(); - Assert.assertTrue(cache.getDfsClientShmManager(). - getDomainSocketWatcher().isClosed()); - cluster.shutdown(); - sockDir.close(); - } - - /** - * When an InterruptedException is sent to a thread calling - * FileChannel#read, the FileChannel is immediately closed and the - * thread gets an exception. This effectively means that we might have - * someone asynchronously calling close() on the file descriptors we use - * in BlockReaderLocal. So when unreferencing a ShortCircuitReplica in - * ShortCircuitCache#unref, we should check if the FileChannel objects - * are still open. If not, we should purge the replica to avoid giving - * it out to any future readers. - * - * This is a regression test for HDFS-6227: Short circuit read failed - * due to ClosedChannelException. - * - * Note that you may still get ClosedChannelException errors if two threads - * are reading from the same replica and an InterruptedException is delivered - * to one of them. - */ - @Test(timeout=120000) - public void testPurgingClosedReplicas() throws Exception { - BlockReaderTestUtil.enableBlockReaderFactoryTracing(); - final AtomicInteger replicasCreated = new AtomicInteger(0); - final AtomicBoolean testFailed = new AtomicBoolean(false); - DFSInputStream.tcpReadsDisabledForTesting = true; - BlockReaderFactory.createShortCircuitReplicaInfoCallback = - new ShortCircuitCache.ShortCircuitReplicaCreator() { - @Override - public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { - replicasCreated.incrementAndGet(); - return null; - } - }; - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration conf = createShortCircuitConf( - "testPurgingClosedReplicas", sockDir); - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4095; - final int SEED = 0xFADE0; - final DistributedFileSystem fs = - (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf); - DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - - final Semaphore sem = new Semaphore(0); - final List<LocatedBlock> locatedBlocks = - cluster.getNameNode().getRpcServer().getBlockLocations( - TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks(); - final LocatedBlock lblock = locatedBlocks.get(0); // first block - final byte[] buf = new byte[TEST_FILE_LEN]; - Runnable readerRunnable = new Runnable() { - @Override - public void run() { - try { - while (true) { - BlockReader blockReader = null; - try { - blockReader = BlockReaderTestUtil.getBlockReader( - cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); - sem.release(); - try { - blockReader.readAll(buf, 0, TEST_FILE_LEN); - } finally { - sem.acquireUninterruptibly(); - } - } catch (ClosedByInterruptException e) { - LOG.info("got the expected ClosedByInterruptException", e); - sem.release(); - break; - } finally { - if (blockReader != null) blockReader.close(); - } - LOG.info("read another " + TEST_FILE_LEN + " bytes."); - } - } catch (Throwable t) { - LOG.error("getBlockReader failure", t); - testFailed.set(true); - sem.release(); - } - } - }; - Thread thread = new Thread(readerRunnable); - thread.start(); - - // While the thread is reading, send it interrupts. - // These should trigger a ClosedChannelException. - while (thread.isAlive()) { - sem.acquireUninterruptibly(); - thread.interrupt(); - sem.release(); - } - Assert.assertFalse(testFailed.get()); - - // We should be able to read from the file without - // getting a ClosedChannelException. - BlockReader blockReader = null; - try { - blockReader = BlockReaderTestUtil.getBlockReader( - cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); - blockReader.readFully(buf, 0, TEST_FILE_LEN); - } finally { - if (blockReader != null) blockReader.close(); - } - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(buf, expected)); - - // Another ShortCircuitReplica object should have been created. - Assert.assertEquals(2, replicasCreated.get()); - - dfs.close(); - cluster.shutdown(); - sockDir.close(); - } -}
