http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java deleted file mode 100644 index 85f925f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.hadoop.classification.InterfaceAudience; - -import java.io.IOException; - -/** - * For sharing between the local and remote block reader implementations. - */ [email protected] -class BlockReaderUtil { - - /* See {@link BlockReader#readAll(byte[], int, int)} */ - public static int readAll(BlockReader reader, - byte[] buf, int offset, int len) throws IOException { - int n = 0; - for (;;) { - int nread = reader.read(buf, offset + n, len - n); - if (nread <= 0) - return (n == 0) ? nread : n; - n += nread; - if (n >= len) - return n; - } - } - - /* See {@link BlockReader#readFully(byte[], int, int)} */ - public static void readFully(BlockReader reader, - byte[] buf, int off, int len) throws IOException { - int toRead = len; - while (toRead > 0) { - int ret = reader.read(buf, off, toRead); - if (ret < 0) { - throw new IOException("Premature EOF from inputStream"); - } - toRead -= ret; - off += ret; - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 6bef2bf..23cae0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -59,6 +59,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java deleted file mode 100644 index fae2cc0..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.EnumSet; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; - -/** - * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from - * replicas. - */ [email protected] -public final class ExternalBlockReader implements BlockReader { - private final ReplicaAccessor accessor; - private final long visibleLength; - private long pos; - - ExternalBlockReader(ReplicaAccessor accessor, long visibleLength, - long startOffset) { - this.accessor = accessor; - this.visibleLength = visibleLength; - this.pos = startOffset; - } - - @Override - public int read(byte[] buf, int off, int len) throws IOException { - int nread = accessor.read(pos, buf, off, len); - if (nread < 0) { - return nread; - } - pos += nread; - return nread; - } - - @Override - public int read(ByteBuffer buf) throws IOException { - int nread = accessor.read(pos, buf); - if (nread < 0) { - return nread; - } - pos += nread; - return nread; - } - - @Override - public long skip(long n) throws IOException { - // You cannot skip backwards - if (n <= 0) { - return 0; - } - // You can't skip past the last offset that we want to read with this - // block reader. - long oldPos = pos; - pos += n; - if (pos > visibleLength) { - pos = visibleLength; - } - return pos - oldPos; - } - - @Override - public int available() { - // We return the amount of bytes between the current offset and the visible - // length. Some of the other block readers return a shorter length than - // that. The only advantage to returning a shorter length is that the - // DFSInputStream will trash your block reader and create a new one if - // someone tries to seek() beyond the available() region. - long diff = visibleLength - pos; - if (diff > Integer.MAX_VALUE) { - return Integer.MAX_VALUE; - } else { - return (int)diff; - } - } - - @Override - public void close() throws IOException { - accessor.close(); - } - - @Override - public void readFully(byte[] buf, int offset, int len) throws IOException { - BlockReaderUtil.readFully(this, buf, offset, len); - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return BlockReaderUtil.readAll(this, buf, offset, len); - } - - @Override - public boolean isLocal() { - return accessor.isLocal(); - } - - @Override - public boolean isShortCircuit() { - return accessor.isShortCircuit(); - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - // For now, pluggable ReplicaAccessors do not support zero-copy. - return null; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java deleted file mode 100644 index 028c964..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ /dev/null @@ -1,510 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.EnumSet; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.FSInputChecker; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.core.TraceScope; -import org.apache.htrace.core.Tracer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * @deprecated this is an old implementation that is being left around - * in case any issues spring up with the new {@link RemoteBlockReader2} - * implementation. - * It will be removed in the next release. - */ [email protected] -@Deprecated -public class RemoteBlockReader extends FSInputChecker implements BlockReader { - static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class); - - private final Peer peer; - private final DatanodeID datanodeID; - private final DataInputStream in; - private DataChecksum checksum; - - /** offset in block of the last chunk received */ - private long lastChunkOffset = -1; - private long lastChunkLen = -1; - private long lastSeqNo = -1; - - /** offset in block where reader wants to actually read */ - private long startOffset; - - private final long blockId; - - /** offset in block of of first chunk - may be less than startOffset - if startOffset is not chunk-aligned */ - private final long firstChunkOffset; - - private final int bytesPerChecksum; - private final int checksumSize; - - /** - * The total number of bytes we need to transfer from the DN. - * This is the amount that the user has requested plus some padding - * at the beginning so that the read can begin on a chunk boundary. - */ - private final long bytesNeededToFinish; - - /** - * True if we are reading from a local DataNode. - */ - private final boolean isLocal; - - private boolean eos = false; - private boolean sentStatusCode = false; - - ByteBuffer checksumBytes = null; - /** Amount of unread data in the current received packet */ - int dataLeft = 0; - - private final PeerCache peerCache; - - private final Tracer tracer; - - /* FSInputChecker interface */ - - /* same interface as inputStream java.io.InputStream#read() - * used by DFSInputStream#read() - * This violates one rule when there is a checksum error: - * "Read should not modify user buffer before successful read" - * because it first reads the data to user buffer and then checks - * the checksum. - */ - @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - - // This has to be set here, *before* the skip, since we can - // hit EOS during the skip, in the case that our entire read - // is smaller than the checksum chunk. - boolean eosBefore = eos; - - //for the first read, skip the extra bytes at the front. - if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { - // Skip these bytes. But don't call this.skip()! - int toSkip = (int)(startOffset - firstChunkOffset); - if ( super.readAndDiscard(toSkip) != toSkip ) { - // should never happen - throw new IOException("Could not skip required number of bytes"); - } - } - - int nRead = super.read(buf, off, len); - - // if eos was set in the previous read, send a status code to the DN - if (eos && !eosBefore && nRead >= 0) { - if (needChecksum()) { - sendReadResult(peer, Status.CHECKSUM_OK); - } else { - sendReadResult(peer, Status.SUCCESS); - } - } - return nRead; - } - - @Override - public synchronized long skip(long n) throws IOException { - /* How can we make sure we don't throw a ChecksumException, at least - * in majority of the cases?. This one throws. */ - long nSkipped = 0; - while (nSkipped < n) { - int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE); - int ret = readAndDiscard(toSkip); - if (ret <= 0) { - return nSkipped; - } - nSkipped += ret; - } - return nSkipped; - } - - @Override - public int read() throws IOException { - throw new IOException("read() is not expected to be invoked. " + - "Use read(buf, off, len) instead."); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - /* Checksum errors are handled outside the BlockReader. - * DFSInputStream does not always call 'seekToNewSource'. In the - * case of pread(), it just tries a different replica without seeking. - */ - return false; - } - - @Override - public void seek(long pos) throws IOException { - throw new IOException("Seek() is not supported in BlockInputChecker"); - } - - @Override - protected long getChunkPosition(long pos) { - throw new RuntimeException("getChunkPosition() is not supported, " + - "since seek is not required"); - } - - /** - * Makes sure that checksumBytes has enough capacity - * and limit is set to the number of checksum bytes needed - * to be read. - */ - private void adjustChecksumBytes(int dataLen) { - int requiredSize = - ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; - if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { - checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); - } else { - checksumBytes.clear(); - } - checksumBytes.limit(requiredSize); - } - - @Override - protected synchronized int readChunk(long pos, byte[] buf, int offset, - int len, byte[] checksumBuf) - throws IOException { - try (TraceScope ignored = tracer.newScope( - "RemoteBlockReader#readChunk(" + blockId + ")")) { - return readChunkImpl(pos, buf, offset, len, checksumBuf); - } - } - - private synchronized int readChunkImpl(long pos, byte[] buf, int offset, - int len, byte[] checksumBuf) - throws IOException { - // Read one chunk. - if (eos) { - // Already hit EOF - return -1; - } - - // Read one DATA_CHUNK. - long chunkOffset = lastChunkOffset; - if ( lastChunkLen > 0 ) { - chunkOffset += lastChunkLen; - } - - // pos is relative to the start of the first chunk of the read. - // chunkOffset is relative to the start of the block. - // This makes sure that the read passed from FSInputChecker is the - // for the same chunk we expect to be reading from the DN. - if ( (pos + firstChunkOffset) != chunkOffset ) { - throw new IOException("Mismatch in pos : " + pos + " + " + - firstChunkOffset + " != " + chunkOffset); - } - - // Read next packet if the previous packet has been read completely. - if (dataLeft <= 0) { - //Read packet headers. - PacketHeader header = new PacketHeader(); - header.readFields(in); - - LOG.debug("DFSClient readChunk got header {}", header); - - // Sanity check the lengths - if (!header.sanityCheck(lastSeqNo)) { - throw new IOException("BlockReader: error in packet header " + - header); - } - - lastSeqNo = header.getSeqno(); - dataLeft = header.getDataLen(); - adjustChecksumBytes(header.getDataLen()); - if (header.getDataLen() > 0) { - IOUtils.readFully(in, checksumBytes.array(), 0, - checksumBytes.limit()); - } - } - - // Sanity checks - assert len >= bytesPerChecksum; - assert checksum != null; - assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0); - - - int checksumsToRead, bytesToRead; - - if (checksumSize > 0) { - - // How many chunks left in our packet - this is a ceiling - // since we may have a partial chunk at the end of the file - int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; - - // How many chunks we can fit in databuffer - // - note this is a floor since we always read full chunks - int chunksCanFit = Math.min(len / bytesPerChecksum, - checksumBuf.length / checksumSize); - - // How many chunks should we read - checksumsToRead = Math.min(chunksLeft, chunksCanFit); - // How many bytes should we actually read - bytesToRead = Math.min( - checksumsToRead * bytesPerChecksum, // full chunks - dataLeft); // in case we have a partial - } else { - // no checksum - bytesToRead = Math.min(dataLeft, len); - checksumsToRead = 0; - } - - if ( bytesToRead > 0 ) { - // Assert we have enough space - assert bytesToRead <= len; - assert checksumBytes.remaining() >= checksumSize * checksumsToRead; - assert checksumBuf.length >= checksumSize * checksumsToRead; - IOUtils.readFully(in, buf, offset, bytesToRead); - checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead); - } - - dataLeft -= bytesToRead; - assert dataLeft >= 0; - - lastChunkOffset = chunkOffset; - lastChunkLen = bytesToRead; - - // If there's no data left in the current packet after satisfying - // this read, and we have satisfied the client read, we expect - // an empty packet header from the DN to signify this. - // Note that pos + bytesToRead may in fact be greater since the - // DN finishes off the entire last chunk. - if (dataLeft == 0 && - pos + bytesToRead >= bytesNeededToFinish) { - - // Read header - PacketHeader hdr = new PacketHeader(); - hdr.readFields(in); - - if (!hdr.isLastPacketInBlock() || - hdr.getDataLen() != 0) { - throw new IOException("Expected empty end-of-read packet! Header: " + - hdr); - } - - eos = true; - } - - if ( bytesToRead == 0 ) { - return -1; - } - - return bytesToRead; - } - - private RemoteBlockReader(String file, String bpid, long blockId, - DataInputStream in, DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) { - // Path is used only for printing block and file information in debug - super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + - ":" + bpid + ":of:"+ file)/*too non path-like?*/, - 1, verifyChecksum, - checksum.getChecksumSize() > 0? checksum : null, - checksum.getBytesPerChecksum(), - checksum.getChecksumSize()); - - this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. - createSocketAddr(datanodeID.getXferAddr())); - - this.peer = peer; - this.datanodeID = datanodeID; - this.in = in; - this.checksum = checksum; - this.startOffset = Math.max( startOffset, 0 ); - this.blockId = blockId; - - // The total number of bytes that we need to transfer from the DN is - // the amount that the user wants (bytesToRead), plus the padding at - // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read starts/ends mid-chunk. - this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); - - this.firstChunkOffset = firstChunkOffset; - lastChunkOffset = firstChunkOffset; - lastChunkLen = -1; - - bytesPerChecksum = this.checksum.getBytesPerChecksum(); - checksumSize = this.checksum.getChecksumSize(); - this.peerCache = peerCache; - this.tracer = tracer; - } - - /** - * Create a new BlockReader specifically to satisfy a read. - * This method also sends the OP_READ_BLOCK request. - * - * @param file File location - * @param block The block object - * @param blockToken The block token for security - * @param startOffset The read offset, relative to block head - * @param len The number of bytes to read - * @param bufferSize The IO buffer size (not the client buffer size) - * @param verifyChecksum Whether to verify checksum - * @param clientName Client name - * @return New BlockReader instance, or null on error. - */ - public static RemoteBlockReader newBlockReader(String file, - ExtendedBlock block, - Token<BlockTokenIdentifier> blockToken, - long startOffset, long len, - int bufferSize, boolean verifyChecksum, - String clientName, Peer peer, - DatanodeID datanodeID, - PeerCache peerCache, - CachingStrategy cachingStrategy, - Tracer tracer) - throws IOException { - // in and out will be closed when sock is closed (by the caller) - final DataOutputStream out = - new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, - verifyChecksum, cachingStrategy); - - // - // Get bytes in block, set streams - // - - DataInputStream in = new DataInputStream( - new BufferedInputStream(peer.getInputStream(), bufferSize)); - - BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); - RemoteBlockReader2.checkSuccess(status, peer, block, file); - ReadOpChecksumInfoProto checksumInfo = - status.getReadOpChecksumInfo(); - DataChecksum checksum = DataTransferProtoUtil.fromProto( - checksumInfo.getChecksum()); - //Warning when we get CHECKSUM_NULL? - - // Read the first chunk offset. - long firstChunkOffset = checksumInfo.getChunkOffset(); - - if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || - firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { - throw new IOException("BlockReader: error in first chunk offset (" + - firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); - } - - return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), - in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, - peer, datanodeID, peerCache, tracer); - } - - @Override - public synchronized void close() throws IOException { - startOffset = -1; - checksum = null; - if (peerCache != null & sentStatusCode) { - peerCache.put(datanodeID, peer); - } else { - peer.close(); - } - - // in will be closed when its Socket is closed. - } - - @Override - public void readFully(byte[] buf, int readOffset, int amtToRead) - throws IOException { - IOUtils.readFully(this, buf, readOffset, amtToRead); - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return readFully(this, buf, offset, len); - } - - /** - * When the reader reaches end of the read, it sends a status response - * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN - * closing our connection (which we will re-open), but won't affect - * data correctness. - */ - void sendReadResult(Peer peer, Status statusCode) { - assert !sentStatusCode : "already sent status code to " + peer; - try { - RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode); - sentStatusCode = true; - } catch (IOException e) { - // It's ok not to be able to send this. But something is probably wrong. - LOG.info("Could not send read status (" + statusCode + ") to datanode " + - peer.getRemoteAddressString() + ": " + e.getMessage()); - } - } - - @Override - public int read(ByteBuffer buf) throws IOException { - throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); - } - - @Override - public int available() { - // An optimistic estimate of how much data is available - // to us without doing network I/O. - return RemoteBlockReader2.TCP_WINDOW_SIZE; - } - - @Override - public boolean isLocal() { - return isLocal; - } - - @Override - public boolean isShortCircuit() { - return false; - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java deleted file mode 100644 index c15bd1b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ /dev/null @@ -1,470 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.util.EnumSet; -import java.util.UUID; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.core.TraceScope; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.htrace.core.Tracer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a wrapper around connection to datanode - * and understands checksum, offset etc. - * - * Terminology: - * <dl> - * <dt>block</dt> - * <dd>The hdfs block, typically large (~64MB). - * </dd> - * <dt>chunk</dt> - * <dd>A block is divided into chunks, each comes with a checksum. - * We want transfers to be chunk-aligned, to be able to - * verify checksums. - * </dd> - * <dt>packet</dt> - * <dd>A grouping of chunks used for transport. It contains a - * header, followed by checksum data, followed by real data. - * </dd> - * </dl> - * Please see DataNode for the RPC specification. - * - * This is a new implementation introduced in Hadoop 0.23 which - * is more efficient and simpler than the older BlockReader - * implementation. It should be renamed to RemoteBlockReader - * once we are confident in it. - */ [email protected] -public class RemoteBlockReader2 implements BlockReader { - - static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class); - static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB; - - final private Peer peer; - final private DatanodeID datanodeID; - final private PeerCache peerCache; - final private long blockId; - private final ReadableByteChannel in; - - private DataChecksum checksum; - private final PacketReceiver packetReceiver = new PacketReceiver(true); - - private ByteBuffer curDataSlice = null; - - /** offset in block of the last chunk received */ - private long lastSeqNo = -1; - - /** offset in block where reader wants to actually read */ - private long startOffset; - private final String filename; - - private final int bytesPerChecksum; - private final int checksumSize; - - /** - * The total number of bytes we need to transfer from the DN. - * This is the amount that the user has requested plus some padding - * at the beginning so that the read can begin on a chunk boundary. - */ - private long bytesNeededToFinish; - - /** - * True if we are reading from a local DataNode. - */ - private final boolean isLocal; - - private final boolean verifyChecksum; - - private boolean sentStatusCode = false; - - private final Tracer tracer; - - @VisibleForTesting - public Peer getPeer() { - return peer; - } - - @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null); - LOG.trace("Starting read #{} file {} from datanode {}", - randomId, filename, datanodeID.getHostName()); - - if (curDataSlice == null || - curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - try (TraceScope ignored = tracer.newScope( - "RemoteBlockReader2#readNextPacket(" + blockId + ")")) { - readNextPacket(); - } - } - - LOG.trace("Finishing read #{}", randomId); - - if (curDataSlice.remaining() == 0) { - // we're at EOF now - return -1; - } - - int nRead = Math.min(curDataSlice.remaining(), len); - curDataSlice.get(buf, off, nRead); - - return nRead; - } - - - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - if (curDataSlice == null || - (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) { - try (TraceScope ignored = tracer.newScope( - "RemoteBlockReader2#readNextPacket(" + blockId + ")")) { - readNextPacket(); - } - } - if (curDataSlice.remaining() == 0) { - // we're at EOF now - return -1; - } - - int nRead = Math.min(curDataSlice.remaining(), buf.remaining()); - ByteBuffer writeSlice = curDataSlice.duplicate(); - writeSlice.limit(writeSlice.position() + nRead); - buf.put(writeSlice); - curDataSlice.position(writeSlice.position()); - - return nRead; - } - - private void readNextPacket() throws IOException { - //Read packet headers. - packetReceiver.receiveNextPacket(in); - - PacketHeader curHeader = packetReceiver.getHeader(); - curDataSlice = packetReceiver.getDataSlice(); - assert curDataSlice.capacity() == curHeader.getDataLen(); - - LOG.trace("DFSClient readNextPacket got header {}", curHeader); - - // Sanity check the lengths - if (!curHeader.sanityCheck(lastSeqNo)) { - throw new IOException("BlockReader: error in packet header " + - curHeader); - } - - if (curHeader.getDataLen() > 0) { - int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; - int checksumsLen = chunks * checksumSize; - - assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : - "checksum slice capacity=" + - packetReceiver.getChecksumSlice().capacity() + - " checksumsLen=" + checksumsLen; - - lastSeqNo = curHeader.getSeqno(); - if (verifyChecksum && curDataSlice.remaining() > 0) { - // N.B.: the checksum error offset reported here is actually - // relative to the start of the block, not the start of the file. - // This is slightly misleading, but preserves the behavior from - // the older BlockReader. - checksum.verifyChunkedSums(curDataSlice, - packetReceiver.getChecksumSlice(), - filename, curHeader.getOffsetInBlock()); - } - bytesNeededToFinish -= curHeader.getDataLen(); - } - - // First packet will include some data prior to the first byte - // the user requested. Skip it. - if (curHeader.getOffsetInBlock() < startOffset) { - int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); - curDataSlice.position(newPos); - } - - // If we've now satisfied the whole client read, read one last packet - // header, which should be empty - if (bytesNeededToFinish <= 0) { - readTrailingEmptyPacket(); - if (verifyChecksum) { - sendReadResult(Status.CHECKSUM_OK); - } else { - sendReadResult(Status.SUCCESS); - } - } - } - - @Override - public synchronized long skip(long n) throws IOException { - /* How can we make sure we don't throw a ChecksumException, at least - * in majority of the cases?. This one throws. */ - long skipped = 0; - while (skipped < n) { - long needToSkip = n - skipped; - if (curDataSlice == null || - curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - readNextPacket(); - } - if (curDataSlice.remaining() == 0) { - // we're at EOF now - break; - } - - int skip = (int)Math.min(curDataSlice.remaining(), needToSkip); - curDataSlice.position(curDataSlice.position() + skip); - skipped += skip; - } - return skipped; - } - - private void readTrailingEmptyPacket() throws IOException { - LOG.trace("Reading empty packet at end of read"); - - packetReceiver.receiveNextPacket(in); - - PacketHeader trailer = packetReceiver.getHeader(); - if (!trailer.isLastPacketInBlock() || - trailer.getDataLen() != 0) { - throw new IOException("Expected empty end-of-read packet! Header: " + - trailer); - } - } - - protected RemoteBlockReader2(String file, long blockId, - DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) { - this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. - createSocketAddr(datanodeID.getXferAddr())); - // Path is used only for printing block and file information in debug - this.peer = peer; - this.datanodeID = datanodeID; - this.in = peer.getInputStreamChannel(); - this.checksum = checksum; - this.verifyChecksum = verifyChecksum; - this.startOffset = Math.max( startOffset, 0 ); - this.filename = file; - this.peerCache = peerCache; - this.blockId = blockId; - - // The total number of bytes that we need to transfer from the DN is - // the amount that the user wants (bytesToRead), plus the padding at - // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read starts/ends mid-chunk. - this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); - bytesPerChecksum = this.checksum.getBytesPerChecksum(); - checksumSize = this.checksum.getChecksumSize(); - this.tracer = tracer; - } - - - @Override - public synchronized void close() throws IOException { - packetReceiver.close(); - startOffset = -1; - checksum = null; - if (peerCache != null && sentStatusCode) { - peerCache.put(datanodeID, peer); - } else { - peer.close(); - } - - // in will be closed when its Socket is closed. - } - - /** - * When the reader reaches end of the read, it sends a status response - * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN - * closing our connection (which we will re-open), but won't affect - * data correctness. - */ - void sendReadResult(Status statusCode) { - assert !sentStatusCode : "already sent status code to " + peer; - try { - writeReadResult(peer.getOutputStream(), statusCode); - sentStatusCode = true; - } catch (IOException e) { - // It's ok not to be able to send this. But something is probably wrong. - LOG.info("Could not send read status (" + statusCode + ") to datanode " + - peer.getRemoteAddressString() + ": " + e.getMessage()); - } - } - - /** - * Serialize the actual read result on the wire. - */ - static void writeReadResult(OutputStream out, Status statusCode) - throws IOException { - - ClientReadStatusProto.newBuilder() - .setStatus(statusCode) - .build() - .writeDelimitedTo(out); - - out.flush(); - } - - /** - * File name to print when accessing a block directly (from servlets) - * @param s Address of the block location - * @param poolId Block pool ID of the block - * @param blockId Block ID of the block - * @return string that has a file name for debug purposes - */ - public static String getFileName(final InetSocketAddress s, - final String poolId, final long blockId) { - return s.toString() + ":" + poolId + ":" + blockId; - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return BlockReaderUtil.readAll(this, buf, offset, len); - } - - @Override - public void readFully(byte[] buf, int off, int len) throws IOException { - BlockReaderUtil.readFully(this, buf, off, len); - } - - /** - * Create a new BlockReader specifically to satisfy a read. - * This method also sends the OP_READ_BLOCK request. - * - * @param file File location - * @param block The block object - * @param blockToken The block token for security - * @param startOffset The read offset, relative to block head - * @param len The number of bytes to read - * @param verifyChecksum Whether to verify checksum - * @param clientName Client name - * @param peer The Peer to use - * @param datanodeID The DatanodeID this peer is connected to - * @return New BlockReader instance, or null on error. - */ - public static BlockReader newBlockReader(String file, - ExtendedBlock block, - Token<BlockTokenIdentifier> blockToken, - long startOffset, long len, - boolean verifyChecksum, - String clientName, - Peer peer, DatanodeID datanodeID, - PeerCache peerCache, - CachingStrategy cachingStrategy, - Tracer tracer) throws IOException { - // in and out will be closed when sock is closed (by the caller) - final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - peer.getOutputStream())); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, - verifyChecksum, cachingStrategy); - - // - // Get bytes in block - // - DataInputStream in = new DataInputStream(peer.getInputStream()); - - BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); - checkSuccess(status, peer, block, file); - ReadOpChecksumInfoProto checksumInfo = - status.getReadOpChecksumInfo(); - DataChecksum checksum = DataTransferProtoUtil.fromProto( - checksumInfo.getChecksum()); - //Warning when we get CHECKSUM_NULL? - - // Read the first chunk offset. - long firstChunkOffset = checksumInfo.getChunkOffset(); - - if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || - firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { - throw new IOException("BlockReader: error in first chunk offset (" + - firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); - } - - return new RemoteBlockReader2(file, block.getBlockId(), checksum, - verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, - peerCache, tracer); - } - - static void checkSuccess( - BlockOpResponseProto status, Peer peer, - ExtendedBlock block, String file) - throws IOException { - String logInfo = "for OP_READ_BLOCK" - + ", self=" + peer.getLocalAddressString() - + ", remote=" + peer.getRemoteAddressString() - + ", for file " + file - + ", for pool " + block.getBlockPoolId() - + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); - DataTransferProtoUtil.checkBlockOpStatus(status, logInfo); - } - - @Override - public int available() { - // An optimistic estimate of how much data is available - // to us without doing network I/O. - return TCP_WINDOW_SIZE; - } - - @Override - public boolean isLocal() { - return isLocal; - } - - @Override - public boolean isShortCircuit() { - return false; - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java new file mode 100644 index 0000000..5486295 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -0,0 +1,875 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.InetSocketAddress; +import java.util.List; + +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.ClientContext; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.RemotePeerFactory; +import org.apache.hadoop.hdfs.ReplicaAccessor; +import org.apache.hadoop.hdfs.ReplicaAccessorBuilder; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.net.DomainPeer; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.apache.hadoop.hdfs.util.IOUtilsClient; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.PerformanceAdvisory; +import org.apache.hadoop.util.Time; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.htrace.core.Tracer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utility class to create BlockReader implementations. + */ [email protected] +public class BlockReaderFactory implements ShortCircuitReplicaCreator { + static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class); + + public static class FailureInjector { + public void injectRequestFileDescriptorsFailure() throws IOException { + // do nothing + } + public boolean getSupportsReceiptVerification() { + return true; + } + } + + @VisibleForTesting + static ShortCircuitReplicaCreator + createShortCircuitReplicaInfoCallback = null; + + private final DfsClientConf conf; + + /** + * Injects failures into specific operations during unit tests. + */ + private static FailureInjector failureInjector = new FailureInjector(); + + /** + * The file name, for logging and debugging purposes. + */ + private String fileName; + + /** + * The block ID and block pool ID to use. + */ + private ExtendedBlock block; + + /** + * The block token to use for security purposes. + */ + private Token<BlockTokenIdentifier> token; + + /** + * The offset within the block to start reading at. + */ + private long startOffset; + + /** + * If false, we won't try to verify the block checksum. + */ + private boolean verifyChecksum; + + /** + * The name of this client. + */ + private String clientName; + + /** + * The DataNode we're talking to. + */ + private DatanodeInfo datanode; + + /** + * StorageType of replica on DataNode. + */ + private StorageType storageType; + + /** + * If false, we won't try short-circuit local reads. + */ + private boolean allowShortCircuitLocalReads; + + /** + * The ClientContext to use for things like the PeerCache. + */ + private ClientContext clientContext; + + /** + * Number of bytes to read. Must be set to a non-negative value. + */ + private long length = -1; + + /** + * Caching strategy to use when reading the block. + */ + private CachingStrategy cachingStrategy; + + /** + * Socket address to use to connect to peer. + */ + private InetSocketAddress inetSocketAddress; + + /** + * Remote peer factory to use to create a peer, if needed. + */ + private RemotePeerFactory remotePeerFactory; + + /** + * UserGroupInformation to use for legacy block reader local objects, + * if needed. + */ + private UserGroupInformation userGroupInformation; + + /** + * Configuration to use for legacy block reader local objects, if needed. + */ + private Configuration configuration; + + /** + * The HTrace tracer to use. + */ + private Tracer tracer; + + /** + * Information about the domain socket path we should use to connect to the + * local peer-- or null if we haven't examined the local domain socket. + */ + private DomainSocketFactory.PathInfo pathInfo; + + /** + * The remaining number of times that we'll try to pull a socket out of the + * cache. + */ + private int remainingCacheTries; + + public BlockReaderFactory(DfsClientConf conf) { + this.conf = conf; + this.remainingCacheTries = conf.getNumCachedConnRetry(); + } + + public BlockReaderFactory setFileName(String fileName) { + this.fileName = fileName; + return this; + } + + public BlockReaderFactory setBlock(ExtendedBlock block) { + this.block = block; + return this; + } + + public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) { + this.token = token; + return this; + } + + public BlockReaderFactory setStartOffset(long startOffset) { + this.startOffset = startOffset; + return this; + } + + public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) { + this.verifyChecksum = verifyChecksum; + return this; + } + + public BlockReaderFactory setClientName(String clientName) { + this.clientName = clientName; + return this; + } + + public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) { + this.datanode = datanode; + return this; + } + + public BlockReaderFactory setStorageType(StorageType storageType) { + this.storageType = storageType; + return this; + } + + public BlockReaderFactory setAllowShortCircuitLocalReads( + boolean allowShortCircuitLocalReads) { + this.allowShortCircuitLocalReads = allowShortCircuitLocalReads; + return this; + } + + public BlockReaderFactory setClientCacheContext( + ClientContext clientContext) { + this.clientContext = clientContext; + return this; + } + + public BlockReaderFactory setLength(long length) { + this.length = length; + return this; + } + + public BlockReaderFactory setCachingStrategy( + CachingStrategy cachingStrategy) { + this.cachingStrategy = cachingStrategy; + return this; + } + + public BlockReaderFactory setInetSocketAddress ( + InetSocketAddress inetSocketAddress) { + this.inetSocketAddress = inetSocketAddress; + return this; + } + + public BlockReaderFactory setUserGroupInformation( + UserGroupInformation userGroupInformation) { + this.userGroupInformation = userGroupInformation; + return this; + } + + public BlockReaderFactory setRemotePeerFactory( + RemotePeerFactory remotePeerFactory) { + this.remotePeerFactory = remotePeerFactory; + return this; + } + + public BlockReaderFactory setConfiguration( + Configuration configuration) { + this.configuration = configuration; + return this; + } + + public BlockReaderFactory setTracer(Tracer tracer) { + this.tracer = tracer; + return this; + } + + @VisibleForTesting + public static void setFailureInjectorForTesting(FailureInjector injector) { + failureInjector = injector; + } + + /** + * Build a BlockReader with the given options. + * + * This function will do the best it can to create a block reader that meets + * all of our requirements. We prefer short-circuit block readers + * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the + * former avoid the overhead of socket communication. If short-circuit is + * unavailable, our next fallback is data transfer over UNIX domain sockets, + * if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't + * work, we will try to create a remote block reader that operates over TCP + * sockets. + * + * There are a few caches that are important here. + * + * The ShortCircuitCache stores file descriptor objects which have been passed + * from the DataNode. + * + * The DomainSocketFactory stores information about UNIX domain socket paths + * that we not been able to use in the past, so that we don't waste time + * retrying them over and over. (Like all the caches, it does have a timeout, + * though.) + * + * The PeerCache stores peers that we have used in the past. If we can reuse + * one of these peers, we avoid the overhead of re-opening a socket. However, + * if the socket has been timed out on the remote end, our attempt to reuse + * the socket may end with an IOException. For that reason, we limit our + * attempts at socket reuse to dfs.client.cached.conn.retry times. After + * that, we create new sockets. This avoids the problem where a thread tries + * to talk to a peer that it hasn't talked to in a while, and has to clean out + * every entry in a socket cache full of stale entries. + * + * @return The new BlockReader. We will not return null. + * + * @throws InvalidToken + * If the block token was invalid. + * InvalidEncryptionKeyException + * If the encryption key was invalid. + * Other IOException + * If there was another problem. + */ + public BlockReader build() throws IOException { + Preconditions.checkNotNull(configuration); + Preconditions + .checkState(length >= 0, "Length must be set to a non-negative value"); + BlockReader reader = tryToCreateExternalBlockReader(); + if (reader != null) { + return reader; + } + final ShortCircuitConf scConf = conf.getShortCircuitConf(); + if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) { + if (clientContext.getUseLegacyBlockReaderLocal()) { + reader = getLegacyBlockReaderLocal(); + if (reader != null) { + LOG.trace("{}: returning new legacy block reader local.", this); + return reader; + } + } else { + reader = getBlockReaderLocal(); + if (reader != null) { + LOG.trace("{}: returning new block reader local.", this); + return reader; + } + } + } + if (scConf.isDomainSocketDataTraffic()) { + reader = getRemoteBlockReaderFromDomain(); + if (reader != null) { + LOG.trace("{}: returning new remote block reader using UNIX domain " + + "socket on {}", this, pathInfo.getPath()); + return reader; + } + } + Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting, + "TCP reads were disabled for testing, but we failed to " + + "do a non-TCP read."); + return getRemoteBlockReaderFromTcp(); + } + + private BlockReader tryToCreateExternalBlockReader() { + List<Class<? extends ReplicaAccessorBuilder>> clses = + conf.getReplicaAccessorBuilderClasses(); + for (Class<? extends ReplicaAccessorBuilder> cls : clses) { + try { + ByteArrayDataOutput bado = ByteStreams.newDataOutput(); + token.write(bado); + byte tokenBytes[] = bado.toByteArray(); + + Constructor<? extends ReplicaAccessorBuilder> ctor = + cls.getConstructor(); + ReplicaAccessorBuilder builder = ctor.newInstance(); + long visibleLength = startOffset + length; + ReplicaAccessor accessor = builder. + setAllowShortCircuitReads(allowShortCircuitLocalReads). + setBlock(block.getBlockId(), block.getBlockPoolId()). + setGenerationStamp(block.getGenerationStamp()). + setBlockAccessToken(tokenBytes). + setClientName(clientName). + setConfiguration(configuration). + setFileName(fileName). + setVerifyChecksum(verifyChecksum). + setVisibleLength(visibleLength). + build(); + if (accessor == null) { + LOG.trace("{}: No ReplicaAccessor created by {}", + this, cls.getName()); + } else { + return new ExternalBlockReader(accessor, visibleLength, startOffset); + } + } catch (Throwable t) { + LOG.warn("Failed to construct new object of type " + + cls.getName(), t); + } + } + return null; + } + + + /** + * Get {@link BlockReaderLocalLegacy} for short circuited local reads. + * This block reader implements the path-based style of local reads + * first introduced in HDFS-2246. + */ + private BlockReader getLegacyBlockReaderLocal() throws IOException { + LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this); + if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) { + LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address" + + "{} is not local", this, inetSocketAddress); + return null; + } + if (clientContext.getDisableLegacyBlockReaderLocal()) { + PerformanceAdvisory.LOG.debug("{}: can't construct " + + "BlockReaderLocalLegacy because " + + "disableLegacyBlockReaderLocal is set.", this); + return null; + } + IOException ioe; + try { + return BlockReaderLocalLegacy.newBlockReader(conf, + userGroupInformation, configuration, fileName, block, token, + datanode, startOffset, length, storageType, tracer); + } catch (RemoteException remoteException) { + ioe = remoteException.unwrapRemoteException( + InvalidToken.class, AccessControlException.class); + } catch (IOException e) { + ioe = e; + } + if ((!(ioe instanceof AccessControlException)) && + isSecurityException(ioe)) { + // Handle security exceptions. + // We do not handle AccessControlException here, since + // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate + // that the user is not in dfs.block.local-path-access.user, a condition + // which requires us to disable legacy SCR. + throw ioe; + } + LOG.warn(this + ": error creating legacy BlockReaderLocal. " + + "Disabling legacy local reads.", ioe); + clientContext.setDisableLegacyBlockReaderLocal(); + return null; + } + + private BlockReader getBlockReaderLocal() throws InvalidToken { + LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit " + + " reads.", this); + if (pathInfo == null) { + pathInfo = clientContext.getDomainSocketFactory() + .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); + } + if (!pathInfo.getPathState().getUsableForShortCircuit()) { + PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " + + "giving up on BlockReaderLocal.", this, pathInfo); + return null; + } + ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), + block.getBlockPoolId()); + ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); + InvalidToken exc = info.getInvalidTokenException(); + if (exc != null) { + LOG.trace("{}: got InvalidToken exception while trying to construct " + + "BlockReaderLocal via {}", this, pathInfo.getPath()); + throw exc; + } + if (info.getReplica() == null) { + PerformanceAdvisory.LOG.debug("{}: failed to get " + + "ShortCircuitReplica. Cannot construct " + + "BlockReaderLocal via {}", this, pathInfo.getPath()); + return null; + } + return new BlockReaderLocal.Builder(conf.getShortCircuitConf()). + setFilename(fileName). + setBlock(block). + setStartOffset(startOffset). + setShortCircuitReplica(info.getReplica()). + setVerifyChecksum(verifyChecksum). + setCachingStrategy(cachingStrategy). + setStorageType(storageType). + setTracer(tracer). + build(); + } + + /** + * Fetch a pair of short-circuit block descriptors from a local DataNode. + * + * @return Null if we could not communicate with the datanode, + * a new ShortCircuitReplicaInfo object otherwise. + * ShortCircuitReplicaInfo objects may contain either an + * InvalidToken exception, or a ShortCircuitReplica object ready to + * use. + */ + @Override + public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { + if (createShortCircuitReplicaInfoCallback != null) { + ShortCircuitReplicaInfo info = + createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo(); + if (info != null) return info; + } + LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this); + BlockReaderPeer curPeer; + while (true) { + curPeer = nextDomainPeer(); + if (curPeer == null) break; + if (curPeer.fromCache) remainingCacheTries--; + DomainPeer peer = (DomainPeer)curPeer.peer; + Slot slot = null; + ShortCircuitCache cache = clientContext.getShortCircuitCache(); + try { + MutableBoolean usedPeer = new MutableBoolean(false); + slot = cache.allocShmSlot(datanode, peer, usedPeer, + new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()), + clientName); + if (usedPeer.booleanValue()) { + LOG.trace("{}: allocShmSlot used up our previous socket {}. " + + "Allocating a new one...", this, peer.getDomainSocket()); + curPeer = nextDomainPeer(); + if (curPeer == null) break; + peer = (DomainPeer)curPeer.peer; + } + ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot); + clientContext.getPeerCache().put(datanode, peer); + return info; + } catch (IOException e) { + if (slot != null) { + cache.freeSlot(slot); + } + if (curPeer.fromCache) { + // Handle an I/O error we got when using a cached socket. + // These are considered less serious, because the socket may be stale. + LOG.debug("{}: closing stale domain peer {}", this, peer, e); + IOUtilsClient.cleanup(LOG, peer); + } else { + // Handle an I/O error we got when using a newly created socket. + // We temporarily disable the domain socket path for a few minutes in + // this case, to prevent wasting more time on it. + LOG.warn(this + ": I/O error requesting file descriptors. " + + "Disabling domain socket " + peer.getDomainSocket(), e); + IOUtilsClient.cleanup(LOG, peer); + clientContext.getDomainSocketFactory() + .disableDomainSocketPath(pathInfo.getPath()); + return null; + } + } + } + return null; + } + + /** + * Request file descriptors from a DomainPeer. + * + * @param peer The peer to use for communication. + * @param slot If non-null, the shared memory slot to associate with the + * new ShortCircuitReplica. + * + * @return A ShortCircuitReplica object if we could communicate with the + * datanode; null, otherwise. + * @throws IOException If we encountered an I/O exception while communicating + * with the datanode. + */ + private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, + Slot slot) throws IOException { + ShortCircuitCache cache = clientContext.getShortCircuitCache(); + final DataOutputStream out = + new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); + SlotId slotId = slot == null ? null : slot.getSlotId(); + new Sender(out).requestShortCircuitFds(block, token, slotId, 1, + failureInjector.getSupportsReceiptVerification()); + DataInputStream in = new DataInputStream(peer.getInputStream()); + BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + DomainSocket sock = peer.getDomainSocket(); + failureInjector.injectRequestFileDescriptorsFailure(); + switch (resp.getStatus()) { + case SUCCESS: + byte buf[] = new byte[1]; + FileInputStream[] fis = new FileInputStream[2]; + sock.recvFileInputStreams(fis, buf, 0, buf.length); + ShortCircuitReplica replica = null; + try { + ExtendedBlockId key = + new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); + if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { + LOG.trace("Sending receipt verification byte for slot {}", slot); + sock.getOutputStream().write(0); + } + replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, + Time.monotonicNow(), slot); + return new ShortCircuitReplicaInfo(replica); + } catch (IOException e) { + // This indicates an error reading from disk, or a format error. Since + // it's not a socket communication problem, we return null rather than + // throwing an exception. + LOG.warn(this + ": error creating ShortCircuitReplica.", e); + return null; + } finally { + if (replica == null) { + IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]); + } + } + case ERROR_UNSUPPORTED: + if (!resp.hasShortCircuitAccessVersion()) { + LOG.warn("short-circuit read access is disabled for " + + "DataNode " + datanode + ". reason: " + resp.getMessage()); + clientContext.getDomainSocketFactory() + .disableShortCircuitForPath(pathInfo.getPath()); + } else { + LOG.warn("short-circuit read access for the file " + + fileName + " is disabled for DataNode " + datanode + + ". reason: " + resp.getMessage()); + } + return null; + case ERROR_ACCESS_TOKEN: + String msg = "access control error while " + + "attempting to set up short-circuit access to " + + fileName + resp.getMessage(); + LOG.debug("{}:{}", this, msg); + return new ShortCircuitReplicaInfo(new InvalidToken(msg)); + default: + LOG.warn(this + ": unknown response code " + resp.getStatus() + + " while attempting to set up short-circuit access. " + + resp.getMessage()); + clientContext.getDomainSocketFactory() + .disableShortCircuitForPath(pathInfo.getPath()); + return null; + } + } + + /** + * Get a BlockReaderRemote that communicates over a UNIX domain socket. + * + * @return The new BlockReader, or null if we failed to create the block + * reader. + * + * @throws InvalidToken If the block token was invalid. + * Potentially other security-related execptions. + */ + private BlockReader getRemoteBlockReaderFromDomain() throws IOException { + if (pathInfo == null) { + pathInfo = clientContext.getDomainSocketFactory() + .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); + } + if (!pathInfo.getPathState().getUsableForDataTransfer()) { + PerformanceAdvisory.LOG.debug("{}: not trying to create a " + + "remote block reader because the UNIX domain socket at {}" + + " is not usable.", this, pathInfo); + return null; + } + LOG.trace("{}: trying to create a remote block reader from the UNIX domain " + + "socket at {}", this, pathInfo.getPath()); + + while (true) { + BlockReaderPeer curPeer = nextDomainPeer(); + if (curPeer == null) break; + if (curPeer.fromCache) remainingCacheTries--; + DomainPeer peer = (DomainPeer)curPeer.peer; + BlockReader blockReader = null; + try { + blockReader = getRemoteBlockReader(peer); + return blockReader; + } catch (IOException ioe) { + IOUtilsClient.cleanup(LOG, peer); + if (isSecurityException(ioe)) { + LOG.trace("{}: got security exception while constructing a remote " + + " block reader from the unix domain socket at {}", + this, pathInfo.getPath(), ioe); + throw ioe; + } + if (curPeer.fromCache) { + // Handle an I/O error we got when using a cached peer. These are + // considered less serious because the underlying socket may be stale. + LOG.debug("Closed potentially stale domain peer {}", peer, ioe); + } else { + // Handle an I/O error we got when using a newly created domain peer. + // We temporarily disable the domain socket path for a few minutes in + // this case, to prevent wasting more time on it. + LOG.warn("I/O error constructing remote block reader. Disabling " + + "domain socket " + peer.getDomainSocket(), ioe); + clientContext.getDomainSocketFactory() + .disableDomainSocketPath(pathInfo.getPath()); + return null; + } + } finally { + if (blockReader == null) { + IOUtilsClient.cleanup(LOG, peer); + } + } + } + return null; + } + + /** + * Get a BlockReaderRemote that communicates over a TCP socket. + * + * @return The new BlockReader. We will not return null, but instead throw + * an exception if this fails. + * + * @throws InvalidToken + * If the block token was invalid. + * InvalidEncryptionKeyException + * If the encryption key was invalid. + * Other IOException + * If there was another problem. + */ + private BlockReader getRemoteBlockReaderFromTcp() throws IOException { + LOG.trace("{}: trying to create a remote block reader from a TCP socket", + this); + BlockReader blockReader = null; + while (true) { + BlockReaderPeer curPeer = null; + Peer peer = null; + try { + curPeer = nextTcpPeer(); + if (curPeer.fromCache) remainingCacheTries--; + peer = curPeer.peer; + blockReader = getRemoteBlockReader(peer); + return blockReader; + } catch (IOException ioe) { + if (isSecurityException(ioe)) { + LOG.trace("{}: got security exception while constructing a remote " + + "block reader from {}", this, peer, ioe); + throw ioe; + } + if ((curPeer != null) && curPeer.fromCache) { + // Handle an I/O error we got when using a cached peer. These are + // considered less serious, because the underlying socket may be + // stale. + LOG.debug("Closed potentially stale remote peer {}", peer, ioe); + } else { + // Handle an I/O error we got when using a newly created peer. + LOG.warn("I/O error constructing remote block reader.", ioe); + throw ioe; + } + } finally { + if (blockReader == null) { + IOUtilsClient.cleanup(LOG, peer); + } + } + } + } + + public static class BlockReaderPeer { + final Peer peer; + final boolean fromCache; + + BlockReaderPeer(Peer peer, boolean fromCache) { + this.peer = peer; + this.fromCache = fromCache; + } + } + + /** + * Get the next DomainPeer-- either from the cache or by creating it. + * + * @return the next DomainPeer, or null if we could not construct one. + */ + private BlockReaderPeer nextDomainPeer() { + if (remainingCacheTries > 0) { + Peer peer = clientContext.getPeerCache().get(datanode, true); + if (peer != null) { + LOG.trace("nextDomainPeer: reusing existing peer {}", peer); + return new BlockReaderPeer(peer, true); + } + } + DomainSocket sock = clientContext.getDomainSocketFactory(). + createSocket(pathInfo, conf.getSocketTimeout()); + if (sock == null) return null; + return new BlockReaderPeer(new DomainPeer(sock), false); + } + + /** + * Get the next TCP-based peer-- either from the cache or by creating it. + * + * @return the next Peer, or null if we could not construct one. + * + * @throws IOException If there was an error while constructing the peer + * (such as an InvalidEncryptionKeyException) + */ + private BlockReaderPeer nextTcpPeer() throws IOException { + if (remainingCacheTries > 0) { + Peer peer = clientContext.getPeerCache().get(datanode, false); + if (peer != null) { + LOG.trace("nextTcpPeer: reusing existing peer {}", peer); + return new BlockReaderPeer(peer, true); + } + } + try { + Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token, + datanode); + LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer); + return new BlockReaderPeer(peer, false); + } catch (IOException e) { + LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to" + + "{}", datanode); + throw e; + } + } + + /** + * Determine if an exception is security-related. + * + * We need to handle these exceptions differently than other IOExceptions. + * They don't indicate a communication problem. Instead, they mean that there + * is some action the client needs to take, such as refetching block tokens, + * renewing encryption keys, etc. + * + * @param ioe The exception + * @return True only if the exception is security-related. + */ + private static boolean isSecurityException(IOException ioe) { + return (ioe instanceof InvalidToken) || + (ioe instanceof InvalidEncryptionKeyException) || + (ioe instanceof InvalidBlockTokenException) || + (ioe instanceof AccessControlException); + } + + @SuppressWarnings("deprecation") + private BlockReader getRemoteBlockReader(Peer peer) throws IOException { + if (conf.getShortCircuitConf().isUseLegacyBlockReader()) { + return BlockReaderRemote.newBlockReader(fileName, + block, token, startOffset, length, conf.getIoBufferSize(), + verifyChecksum, clientName, peer, datanode, + clientContext.getPeerCache(), cachingStrategy, tracer); + } else { + return BlockReaderRemote2.newBlockReader( + fileName, block, token, startOffset, length, + verifyChecksum, clientName, peer, datanode, + clientContext.getPeerCache(), cachingStrategy, tracer); + } + } + + @Override + public String toString() { + return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")"; + } + + /** + * File name to print when accessing a block directly (from servlets) + * @param s Address of the block location + * @param poolId Block pool ID of the block + * @param blockId Block ID of the block + * @return string that has a file name for debug purposes + */ + public static String getFileName(final InetSocketAddress s, + final String poolId, final long blockId) { + return s.toString() + ":" + poolId + ":" + blockId; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
