http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java new file mode 100644 index 0000000..7349fc2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java @@ -0,0 +1,720 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DirectBufferPool; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * BlockReaderLocal enables local short circuited reads. If the DFS client is on + * the same machine as the datanode, then the client can read files directly + * from the local file system rather than going through the datanode for better + * performance. <br> + * {@link BlockReaderLocal} works as follows: + * <ul> + * <li>The client performing short circuit reads must be configured at the + * datanode.</li> + * <li>The client gets the file descriptors for the metadata file and the data + * file for the block using + * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}. + * </li> + * <li>The client reads the file descriptors.</li> + * </ul> + */ [email protected] +class BlockReaderLocal implements BlockReader { + static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class); + + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + + public static class Builder { + private final int bufferSize; + private boolean verifyChecksum; + private int maxReadahead; + private String filename; + private ShortCircuitReplica replica; + private long dataPos; + private ExtendedBlock block; + private StorageType storageType; + private Tracer tracer; + + public Builder(ShortCircuitConf conf) { + this.maxReadahead = Integer.MAX_VALUE; + this.verifyChecksum = !conf.isSkipShortCircuitChecksums(); + this.bufferSize = conf.getShortCircuitBufferSize(); + } + + public Builder setVerifyChecksum(boolean verifyChecksum) { + this.verifyChecksum = verifyChecksum; + return this; + } + + public Builder setCachingStrategy(CachingStrategy cachingStrategy) { + long readahead = cachingStrategy.getReadahead() != null ? + cachingStrategy.getReadahead() : + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; + this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead); + return this; + } + + public Builder setFilename(String filename) { + this.filename = filename; + return this; + } + + public Builder setShortCircuitReplica(ShortCircuitReplica replica) { + this.replica = replica; + return this; + } + + public Builder setStartOffset(long startOffset) { + this.dataPos = Math.max(0, startOffset); + return this; + } + + public Builder setBlock(ExtendedBlock block) { + this.block = block; + return this; + } + + public Builder setStorageType(StorageType storageType) { + this.storageType = storageType; + return this; + } + + public Builder setTracer(Tracer tracer) { + this.tracer = tracer; + return this; + } + + public BlockReaderLocal build() { + Preconditions.checkNotNull(replica); + return new BlockReaderLocal(this); + } + } + + private boolean closed = false; + + /** + * Pair of streams for this block. + */ + private final ShortCircuitReplica replica; + + /** + * The data FileChannel. + */ + private final FileChannel dataIn; + + /** + * The next place we'll read from in the block data FileChannel. + * + * If data is buffered in dataBuf, this offset will be larger than the + * offset of the next byte which a read() operation will give us. + */ + private long dataPos; + + /** + * The Checksum FileChannel. + */ + private final FileChannel checksumIn; + + /** + * Checksum type and size. + */ + private final DataChecksum checksum; + + /** + * If false, we will always skip the checksum. + */ + private final boolean verifyChecksum; + + /** + * Name of the block, for logging purposes. + */ + private final String filename; + + /** + * Block ID and Block Pool ID. + */ + private final ExtendedBlock block; + + /** + * Cache of Checksum#bytesPerChecksum. + */ + private final int bytesPerChecksum; + + /** + * Cache of Checksum#checksumSize. + */ + private final int checksumSize; + + /** + * Maximum number of chunks to allocate. + * + * This is used to allocate dataBuf and checksumBuf, in the event that + * we need them. + */ + private final int maxAllocatedChunks; + + /** + * True if zero readahead was requested. + */ + private final boolean zeroReadaheadRequested; + + /** + * Maximum amount of readahead we'll do. This will always be at least the, + * size of a single chunk, even if {@link #zeroReadaheadRequested} is true. + * The reason is because we need to do a certain amount of buffering in order + * to do checksumming. + * + * This determines how many bytes we'll use out of dataBuf and checksumBuf. + * Why do we allocate buffers, and then (potentially) only use part of them? + * The rationale is that allocating a lot of buffers of different sizes would + * make it very difficult for the DirectBufferPool to re-use buffers. + */ + private final int maxReadaheadLength; + + /** + * Buffers data starting at the current dataPos and extending on + * for dataBuf.limit(). + * + * This may be null if we don't need it. + */ + private ByteBuffer dataBuf; + + /** + * Buffers checksums starting at the current checksumPos and extending on + * for checksumBuf.limit(). + * + * This may be null if we don't need it. + */ + private ByteBuffer checksumBuf; + + /** + * StorageType of replica on DataNode. + */ + private StorageType storageType; + + /** + * The Tracer to use. + */ + private final Tracer tracer; + + private BlockReaderLocal(Builder builder) { + this.replica = builder.replica; + this.dataIn = replica.getDataStream().getChannel(); + this.dataPos = builder.dataPos; + this.checksumIn = replica.getMetaStream().getChannel(); + BlockMetadataHeader header = builder.replica.getMetaHeader(); + this.checksum = header.getChecksum(); + this.verifyChecksum = builder.verifyChecksum && + (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); + this.filename = builder.filename; + this.block = builder.block; + this.bytesPerChecksum = checksum.getBytesPerChecksum(); + this.checksumSize = checksum.getChecksumSize(); + + this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 : + ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum); + // Calculate the effective maximum readahead. + // We can't do more readahead than there is space in the buffer. + int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 : + ((Math.min(builder.bufferSize, builder.maxReadahead) + + bytesPerChecksum - 1) / bytesPerChecksum); + if (maxReadaheadChunks == 0) { + this.zeroReadaheadRequested = true; + maxReadaheadChunks = 1; + } else { + this.zeroReadaheadRequested = false; + } + this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; + this.storageType = builder.storageType; + this.tracer = builder.tracer; + } + + private synchronized void createDataBufIfNeeded() { + if (dataBuf == null) { + dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum); + dataBuf.position(0); + dataBuf.limit(0); + } + } + + private synchronized void freeDataBufIfExists() { + if (dataBuf != null) { + // When disposing of a dataBuf, we have to move our stored file index + // backwards. + dataPos -= dataBuf.remaining(); + dataBuf.clear(); + bufferPool.returnBuffer(dataBuf); + dataBuf = null; + } + } + + private synchronized void createChecksumBufIfNeeded() { + if (checksumBuf == null) { + checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize); + checksumBuf.position(0); + checksumBuf.limit(0); + } + } + + private synchronized void freeChecksumBufIfExists() { + if (checksumBuf != null) { + checksumBuf.clear(); + bufferPool.returnBuffer(checksumBuf); + checksumBuf = null; + } + } + + private synchronized int drainDataBuf(ByteBuffer buf) { + if (dataBuf == null) return -1; + int oldLimit = dataBuf.limit(); + int nRead = Math.min(dataBuf.remaining(), buf.remaining()); + if (nRead == 0) { + return (dataBuf.remaining() == 0) ? -1 : 0; + } + try { + dataBuf.limit(dataBuf.position() + nRead); + buf.put(dataBuf); + } finally { + dataBuf.limit(oldLimit); + } + return nRead; + } + + /** + * Read from the block file into a buffer. + * + * This function overwrites checksumBuf. It will increment dataPos. + * + * @param buf The buffer to read into. May be dataBuf. + * The position and limit of this buffer should be set to + * multiples of the checksum size. + * @param canSkipChecksum True if we can skip checksumming. + * + * @return Total bytes read. 0 on EOF. + */ + private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum) + throws IOException { + try (TraceScope ignored = tracer.newScope( + "BlockReaderLocal#fillBuffer(" + block.getBlockId() + ")")) { + int total = 0; + long startDataPos = dataPos; + int startBufPos = buf.position(); + while (buf.hasRemaining()) { + int nRead = dataIn.read(buf, dataPos); + if (nRead < 0) { + break; + } + dataPos += nRead; + total += nRead; + } + if (canSkipChecksum) { + freeChecksumBufIfExists(); + return total; + } + if (total > 0) { + try { + buf.limit(buf.position()); + buf.position(startBufPos); + createChecksumBufIfNeeded(); + int checksumsNeeded = (total + bytesPerChecksum - 1) / + bytesPerChecksum; + checksumBuf.clear(); + checksumBuf.limit(checksumsNeeded * checksumSize); + long checksumPos = BlockMetadataHeader.getHeaderSize() + + ((startDataPos / bytesPerChecksum) * checksumSize); + while (checksumBuf.hasRemaining()) { + int nRead = checksumIn.read(checksumBuf, checksumPos); + if (nRead < 0) { + throw new IOException("Got unexpected checksum file EOF at " + + checksumPos + ", block file position " + startDataPos + + " for block " + block + " of file " + filename); + } + checksumPos += nRead; + } + checksumBuf.flip(); + + checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos); + } finally { + buf.position(buf.limit()); + } + } + return total; + } + } + + private boolean createNoChecksumContext() { + return !verifyChecksum || + // Checksums are not stored for replicas on transient storage. We do + // not anchor, because we do not intend for client activity to block + // eviction from transient storage on the DataNode side. + (storageType != null && storageType.isTransient()) || + replica.addNoChecksumAnchor(); + } + + private void releaseNoChecksumContext() { + if (verifyChecksum) { + if (storageType == null || !storageType.isTransient()) { + replica.removeNoChecksumAnchor(); + } + } + } + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + boolean canSkipChecksum = createNoChecksumContext(); + try { + String traceFormatStr = "read(buf.remaining={}, block={}, filename={}, " + + "canSkipChecksum={})"; + LOG.trace(traceFormatStr + ": starting", + buf.remaining(), block, filename, canSkipChecksum); + int nRead; + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(buf); + } else { + nRead = readWithBounceBuffer(buf, canSkipChecksum); + } + } catch (IOException e) { + LOG.trace(traceFormatStr + ": I/O error", + buf.remaining(), block, filename, canSkipChecksum, e); + throw e; + } + LOG.trace(traceFormatStr + ": returning {}", + buf.remaining(), block, filename, canSkipChecksum, nRead); + return nRead; + } finally { + if (canSkipChecksum) releaseNoChecksumContext(); + } + } + + private synchronized int readWithoutBounceBuffer(ByteBuffer buf) + throws IOException { + freeDataBufIfExists(); + freeChecksumBufIfExists(); + int total = 0; + while (buf.hasRemaining()) { + int nRead = dataIn.read(buf, dataPos); + if (nRead <= 0) break; + dataPos += nRead; + total += nRead; + } + return (total == 0 && (dataPos == dataIn.size())) ? -1 : total; + } + + /** + * Fill the data buffer. If necessary, validate the data against the + * checksums. + * + * We always want the offsets of the data contained in dataBuf to be + * aligned to the chunk boundary. If we are validating checksums, we + * accomplish this by seeking backwards in the file until we're on a + * chunk boundary. (This is necessary because we can't checksum a + * partial chunk.) If we are not validating checksums, we simply only + * fill the latter part of dataBuf. + * + * @param canSkipChecksum true if we can skip checksumming. + * @return true if we hit EOF. + * @throws IOException + */ + private synchronized boolean fillDataBuf(boolean canSkipChecksum) + throws IOException { + createDataBufIfNeeded(); + final int slop = (int)(dataPos % bytesPerChecksum); + final long oldDataPos = dataPos; + dataBuf.limit(maxReadaheadLength); + if (canSkipChecksum) { + dataBuf.position(slop); + fillBuffer(dataBuf, true); + } else { + dataPos -= slop; + dataBuf.position(0); + fillBuffer(dataBuf, false); + } + dataBuf.limit(dataBuf.position()); + dataBuf.position(Math.min(dataBuf.position(), slop)); + LOG.trace("loaded {} bytes into bounce buffer from offset {} of {}", + dataBuf.remaining(), oldDataPos, block); + return dataBuf.limit() != maxReadaheadLength; + } + + /** + * Read using the bounce buffer. + * + * A 'direct' read actually has three phases. The first drains any + * remaining bytes from the slow read buffer. After this the read is + * guaranteed to be on a checksum chunk boundary. If there are still bytes + * to read, the fast direct path is used for as many remaining bytes as + * possible, up to a multiple of the checksum chunk size. Finally, any + * 'odd' bytes remaining at the end of the read cause another slow read to + * be issued, which involves an extra copy. + * + * Every 'slow' read tries to fill the slow read buffer in one go for + * efficiency's sake. As described above, all non-checksum-chunk-aligned + * reads will be served from the slower read path. + * + * @param buf The buffer to read into. + * @param canSkipChecksum True if we can skip checksums. + */ + private synchronized int readWithBounceBuffer(ByteBuffer buf, + boolean canSkipChecksum) throws IOException { + int total = 0; + int bb = drainDataBuf(buf); // drain bounce buffer if possible + if (bb >= 0) { + total += bb; + if (buf.remaining() == 0) return total; + } + boolean eof = true, done = false; + do { + if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength) + && ((dataPos % bytesPerChecksum) == 0)) { + // Fast lane: try to read directly into user-supplied buffer, bypassing + // bounce buffer. + int oldLimit = buf.limit(); + int nRead; + try { + buf.limit(buf.position() + maxReadaheadLength); + nRead = fillBuffer(buf, canSkipChecksum); + } finally { + buf.limit(oldLimit); + } + if (nRead < maxReadaheadLength) { + done = true; + } + if (nRead > 0) { + eof = false; + } + total += nRead; + } else { + // Slow lane: refill bounce buffer. + if (fillDataBuf(canSkipChecksum)) { + done = true; + } + bb = drainDataBuf(buf); // drain bounce buffer if possible + if (bb >= 0) { + eof = false; + total += bb; + } + } + } while ((!done) && (buf.remaining() > 0)); + return (eof && total == 0) ? -1 : total; + } + + @Override + public synchronized int read(byte[] arr, int off, int len) + throws IOException { + boolean canSkipChecksum = createNoChecksumContext(); + int nRead; + try { + final String traceFormatStr = "read(arr.length={}, off={}, len={}, " + + "filename={}, block={}, canSkipChecksum={})"; + LOG.trace(traceFormatStr + ": starting", + arr.length, off, len, filename, block, canSkipChecksum); + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(arr, off, len); + } else { + nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum); + } + } catch (IOException e) { + LOG.trace(traceFormatStr + ": I/O error", + arr.length, off, len, filename, block, canSkipChecksum, e); + throw e; + } + LOG.trace(traceFormatStr + ": returning {}", + arr.length, off, len, filename, block, canSkipChecksum, nRead); + } finally { + if (canSkipChecksum) releaseNoChecksumContext(); + } + return nRead; + } + + private synchronized int readWithoutBounceBuffer(byte arr[], int off, + int len) throws IOException { + freeDataBufIfExists(); + freeChecksumBufIfExists(); + int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos); + if (nRead > 0) { + dataPos += nRead; + } else if ((nRead == 0) && (dataPos == dataIn.size())) { + return -1; + } + return nRead; + } + + private synchronized int readWithBounceBuffer(byte arr[], int off, int len, + boolean canSkipChecksum) throws IOException { + createDataBufIfNeeded(); + if (!dataBuf.hasRemaining()) { + dataBuf.position(0); + dataBuf.limit(maxReadaheadLength); + fillDataBuf(canSkipChecksum); + } + if (dataBuf.remaining() == 0) return -1; + int toRead = Math.min(dataBuf.remaining(), len); + dataBuf.get(arr, off, toRead); + return toRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + int discardedFromBuf = 0; + long remaining = n; + if ((dataBuf != null) && dataBuf.hasRemaining()) { + discardedFromBuf = (int)Math.min(dataBuf.remaining(), n); + dataBuf.position(dataBuf.position() + discardedFromBuf); + remaining -= discardedFromBuf; + } + LOG.trace("skip(n={}, block={}, filename={}): discarded {} bytes from " + + "dataBuf and advanced dataPos by {}", + n, block, filename, discardedFromBuf, remaining); + dataPos += remaining; + return n; + } + + @Override + public int available() { + // We never do network I/O in BlockReaderLocal. + return Integer.MAX_VALUE; + } + + @Override + public synchronized void close() throws IOException { + if (closed) return; + closed = true; + LOG.trace("close(filename={}, block={})", filename, block); + replica.unref(); + freeDataBufIfExists(); + freeChecksumBufIfExists(); + } + + @Override + public synchronized void readFully(byte[] arr, int off, int len) + throws IOException { + BlockReaderUtil.readFully(this, arr, off, len); + } + + @Override + public synchronized int readAll(byte[] buf, int off, int len) + throws IOException { + return BlockReaderUtil.readAll(this, buf, off, len); + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public boolean isShortCircuit() { + return true; + } + + /** + * Get or create a memory map for this replica. + * + * There are two kinds of ClientMmap objects we could fetch here: one that + * will always read pre-checksummed data, and one that may read data that + * hasn't been checksummed. + * + * If we fetch the former, "safe" kind of ClientMmap, we have to increment + * the anchor count on the shared memory slot. This will tell the DataNode + * not to munlock the block until this ClientMmap is closed. + * If we fetch the latter, we don't bother with anchoring. + * + * @param opts The options to use, such as SKIP_CHECKSUMS. + * + * @return null on failure; the ClientMmap otherwise. + */ + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + boolean anchor = verifyChecksum && + !opts.contains(ReadOption.SKIP_CHECKSUMS); + if (anchor) { + if (!createNoChecksumContext()) { + LOG.trace("can't get an mmap for {} of {} since SKIP_CHECKSUMS was not " + + "given, we aren't skipping checksums, and the block is not " + + "mlocked.", block, filename); + return null; + } + } + ClientMmap clientMmap = null; + try { + clientMmap = replica.getOrCreateClientMmap(anchor); + } finally { + if ((clientMmap == null) && anchor) { + releaseNoChecksumContext(); + } + } + return clientMmap; + } + + @VisibleForTesting + boolean getVerifyChecksum() { + return this.verifyChecksum; + } + + @VisibleForTesting + int getMaxReadaheadLength() { + return this.maxReadaheadLength; + } + + /** + * Make the replica anchorable. Normally this can only be done by the + * DataNode. This method is only for testing. + */ + @VisibleForTesting + void forceAnchorable() { + replica.getSlot().makeAnchorable(); + } + + /** + * Make the replica unanchorable. Normally this can only be done by the + * DataNode. This method is only for testing. + */ + @VisibleForTesting + void forceUnanchorable() { + replica.getSlot().makeUnanchorable(); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java new file mode 100644 index 0000000..9887a85 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java @@ -0,0 +1,740 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.hdfs.util.IOUtilsClient; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DirectBufferPool; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client + * is on the same machine as the datanode, then the client can read files + * directly from the local file system rather than going through the datanode + * for better performance. <br> + * + * This is the legacy implementation based on HDFS-2246, which requires + * permissions on the datanode to be set so that clients can directly access the + * blocks. The new implementation based on HDFS-347 should be preferred on UNIX + * systems where the required native code has been implemented.<br> + * + * {@link BlockReaderLocalLegacy} works as follows: + * <ul> + * <li>The client performing short circuit reads must be configured at the + * datanode.</li> + * <li>The client gets the path to the file where block is stored using + * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)} + * RPC call</li> + * <li>Client uses kerberos authentication to connect to the datanode over RPC, + * if security is enabled.</li> + * </ul> + */ [email protected] +class BlockReaderLocalLegacy implements BlockReader { + private static final Logger LOG = LoggerFactory.getLogger( + BlockReaderLocalLegacy.class); + + //Stores the cache and proxy for a local datanode. + private static class LocalDatanodeInfo { + private ClientDatanodeProtocol proxy = null; + private final Map<ExtendedBlock, BlockLocalPathInfo> cache; + + LocalDatanodeInfo() { + final int cacheSize = 10000; + final float hashTableLoadFactor = 0.75f; + int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + + 1; + cache = Collections + .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>( + hashTableCapacity, hashTableLoadFactor, true) { + private static final long serialVersionUID = 1; + + @Override + protected boolean removeEldestEntry( + Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) { + return size() > cacheSize; + } + }); + } + + private synchronized ClientDatanodeProtocol getDatanodeProxy( + UserGroupInformation ugi, final DatanodeInfo node, + final Configuration conf, final int socketTimeout, + final boolean connectToDnViaHostname) throws IOException { + if (proxy == null) { + try { + proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() { + @Override + public ClientDatanodeProtocol run() throws Exception { + return DFSUtilClient.createClientDatanodeProtocolProxy(node, conf, + socketTimeout, connectToDnViaHostname); + } + }); + } catch (InterruptedException e) { + LOG.warn("encountered exception ", e); + } + } + return proxy; + } + + private synchronized void resetDatanodeProxy() { + if (null != proxy) { + RPC.stopProxy(proxy); + proxy = null; + } + } + + private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { + return cache.get(b); + } + + private void setBlockLocalPathInfo(ExtendedBlock b, + BlockLocalPathInfo info) { + cache.put(b, info); + } + + private void removeBlockLocalPathInfo(ExtendedBlock b) { + cache.remove(b); + } + } + + // Multiple datanodes could be running on the local machine. Store proxies in + // a map keyed by the ipc port of the datanode. + private static final Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = + new HashMap<>(); + + private final FileInputStream dataIn; // reader for the data file + private final FileInputStream checksumIn; // reader for the checksum file + + /** + * Offset from the most recent chunk boundary at which the next read should + * take place. Is only set to non-zero at construction time, and is + * decremented (usually to 0) by subsequent reads. This avoids having to do a + * checksum read at construction to position the read cursor correctly. + */ + private int offsetFromChunkBoundary; + + private byte[] skipBuf = null; + + /** + * Used for checksummed reads that need to be staged before copying to their + * output buffer because they are either a) smaller than the checksum chunk + * size or b) issued by the slower read(byte[]...) path + */ + private ByteBuffer slowReadBuff = null; + private ByteBuffer checksumBuff = null; + private DataChecksum checksum; + private final boolean verifyChecksum; + + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + + private final int bytesPerChecksum; + private final int checksumSize; + + /** offset in block where reader wants to actually read */ + private long startOffset; + private final String filename; + private long blockId; + private final Tracer tracer; + + /** + * The only way this object can be instantiated. + */ + static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf, + UserGroupInformation userGroupInformation, + Configuration configuration, String file, ExtendedBlock blk, + Token<BlockTokenIdentifier> token, DatanodeInfo node, + long startOffset, long length, StorageType storageType, + Tracer tracer) throws IOException { + final ShortCircuitConf scConf = conf.getShortCircuitConf(); + LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node + .getIpcPort()); + // check the cache first + BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); + if (pathinfo == null) { + if (userGroupInformation == null) { + userGroupInformation = UserGroupInformation.getCurrentUser(); + } + pathinfo = getBlockPathInfo(userGroupInformation, blk, node, + configuration, conf.getSocketTimeout(), token, + conf.isConnectToDnViaHostname(), storageType); + } + + // check to see if the file exists. It may so happen that the + // HDFS file has been deleted and this block-lookup is occurring + // on behalf of a new HDFS file. This time, the block file could + // be residing in a different portion of the fs.data.dir directory. + // In this case, we remove this entry from the cache. The next + // call to this method will re-populate the cache. + FileInputStream dataIn = null; + FileInputStream checksumIn = null; + BlockReaderLocalLegacy localBlockReader = null; + final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums() + || storageType.isTransient(); + try { + // get a local file system + File blkfile = new File(pathinfo.getBlockPath()); + dataIn = new FileInputStream(blkfile); + + LOG.debug("New BlockReaderLocalLegacy for file {} of size {} startOffset " + + "{} length {} short circuit checksum {}", + blkfile, blkfile.length(), startOffset, length, !skipChecksumCheck); + + if (!skipChecksumCheck) { + // get the metadata file + File metafile = new File(pathinfo.getMetaPath()); + checksumIn = new FileInputStream(metafile); + + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( + new DataInputStream(checksumIn), blk); + long firstChunkOffset = startOffset + - (startOffset % checksum.getBytesPerChecksum()); + localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, + startOffset, checksum, true, dataIn, firstChunkOffset, checksumIn, + tracer); + } else { + localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, + startOffset, dataIn, tracer); + } + } catch (IOException e) { + // remove from cache + localDatanodeInfo.removeBlockLocalPathInfo(blk); + LOG.warn("BlockReaderLocalLegacy: Removing " + blk + + " from cache because local file " + pathinfo.getBlockPath() + + " could not be opened."); + throw e; + } finally { + if (localBlockReader == null) { + if (dataIn != null) { + dataIn.close(); + } + if (checksumIn != null) { + checksumIn.close(); + } + } + } + return localBlockReader; + } + + private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) { + LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port); + if (ldInfo == null) { + ldInfo = new LocalDatanodeInfo(); + localDatanodeInfoMap.put(port, ldInfo); + } + return ldInfo; + } + + private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi, + ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout, + Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname, + StorageType storageType) throws IOException { + LocalDatanodeInfo localDatanodeInfo = + getLocalDatanodeInfo(node.getIpcPort()); + BlockLocalPathInfo pathinfo; + ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node, + conf, timeout, connectToDnViaHostname); + try { + // make RPC to local datanode to find local pathnames of blocks + pathinfo = proxy.getBlockLocalPathInfo(blk, token); + // We can't cache the path information for a replica on transient storage. + // If the replica gets evicted, then it moves to a different path. Then, + // our next attempt to read from the cached path would fail to find the + // file. Additionally, the failure would cause us to disable legacy + // short-circuit read for all subsequent use in the ClientContext. Unlike + // the newer short-circuit read implementation, we have no communication + // channel for the DataNode to notify the client that the path has been + // invalidated. Therefore, our only option is to skip caching. + if (pathinfo != null && !storageType.isTransient()) { + LOG.debug("Cached location of block {} as {}", blk, pathinfo); + localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo); + } + } catch (IOException e) { + localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error + throw e; + } + return pathinfo; + } + + private static int getSlowReadBufferNumChunks(int bufferSizeBytes, + int bytesPerChecksum) { + if (bufferSizeBytes < bytesPerChecksum) { + throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " + + "buffer size (" + bufferSizeBytes + ") is not large enough to hold " + + "a single chunk (" + bytesPerChecksum + "). Please configure " + + HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY + + " appropriately"); + } + + // Round down to nearest chunk size + return bufferSizeBytes / bytesPerChecksum; + } + + private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, + ExtendedBlock block, long startOffset, FileInputStream dataIn, + Tracer tracer) throws IOException { + this(conf, hdfsfile, block, startOffset, + DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false, + dataIn, startOffset, null, tracer); + } + + private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, + ExtendedBlock block, long startOffset, DataChecksum checksum, + boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, + FileInputStream checksumIn, Tracer tracer) throws IOException { + this.filename = hdfsfile; + this.checksum = checksum; + this.verifyChecksum = verifyChecksum; + this.startOffset = Math.max(startOffset, 0); + this.blockId = block.getBlockId(); + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + + this.dataIn = dataIn; + this.checksumIn = checksumIn; + this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); + + final int chunksPerChecksumRead = getSlowReadBufferNumChunks( + conf.getShortCircuitBufferSize(), bytesPerChecksum); + slowReadBuff = bufferPool.getBuffer( + bytesPerChecksum * chunksPerChecksumRead); + checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); + // Initially the buffers have nothing to read. + slowReadBuff.flip(); + checksumBuff.flip(); + boolean success = false; + try { + // Skip both input streams to beginning of the chunk containing + // startOffset + IOUtils.skipFully(dataIn, firstChunkOffset); + if (checksumIn != null) { + long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * + checksumSize; + IOUtils.skipFully(checksumIn, checkSumOffset); + } + success = true; + } finally { + if (!success) { + bufferPool.returnBuffer(slowReadBuff); + bufferPool.returnBuffer(checksumBuff); + } + } + this.tracer = tracer; + } + + /** + * Reads bytes into a buffer until EOF or the buffer's limit is reached + */ + private int fillBuffer(FileInputStream stream, ByteBuffer buf) + throws IOException { + try (TraceScope ignored = tracer. + newScope("BlockReaderLocalLegacy#fillBuffer(" + blockId + ")")) { + int bytesRead = stream.getChannel().read(buf); + if (bytesRead < 0) { + //EOF + return bytesRead; + } + while (buf.remaining() > 0) { + int n = stream.getChannel().read(buf); + if (n < 0) { + //EOF + return bytesRead; + } + bytesRead += n; + } + return bytesRead; + } + } + + /** + * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into + * another. + */ + private void writeSlice(ByteBuffer from, ByteBuffer to, int length) { + int oldLimit = from.limit(); + from.limit(from.position() + length); + try { + to.put(from); + } finally { + from.limit(oldLimit); + } + } + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + int nRead = 0; + if (verifyChecksum) { + // A 'direct' read actually has three phases. The first drains any + // remaining bytes from the slow read buffer. After this the read is + // guaranteed to be on a checksum chunk boundary. If there are still bytes + // to read, the fast direct path is used for as many remaining bytes as + // possible, up to a multiple of the checksum chunk size. Finally, any + // 'odd' bytes remaining at the end of the read cause another slow read to + // be issued, which involves an extra copy. + + // Every 'slow' read tries to fill the slow read buffer in one go for + // efficiency's sake. As described above, all non-checksum-chunk-aligned + // reads will be served from the slower read path. + + if (slowReadBuff.hasRemaining()) { + // There are remaining bytes from a small read available. This usually + // means this read is unaligned, which falls back to the slow path. + int fromSlowReadBuff = Math.min(buf.remaining(), + slowReadBuff.remaining()); + writeSlice(slowReadBuff, buf, fromSlowReadBuff); + nRead += fromSlowReadBuff; + } + + if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) { + // Since we have drained the 'small read' buffer, we are guaranteed to + // be chunk-aligned + int len = buf.remaining() - (buf.remaining() % bytesPerChecksum); + + // There's only enough checksum buffer space available to checksum one + // entire slow read buffer. This saves keeping the number of checksum + // chunks around. + len = Math.min(len, slowReadBuff.capacity()); + int oldlimit = buf.limit(); + buf.limit(buf.position() + len); + int readResult = 0; + try { + readResult = doByteBufferRead(buf); + } finally { + buf.limit(oldlimit); + } + if (readResult == -1) { + return nRead; + } else { + nRead += readResult; + buf.position(buf.position() + readResult); + } + } + + // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read + // until chunk boundary + if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || + offsetFromChunkBoundary > 0) { + int toRead = Math.min(buf.remaining(), + bytesPerChecksum - offsetFromChunkBoundary); + int readResult = fillSlowReadBuffer(toRead); + if (readResult == -1) { + return nRead; + } else { + int fromSlowReadBuff = Math.min(readResult, buf.remaining()); + writeSlice(slowReadBuff, buf, fromSlowReadBuff); + nRead += fromSlowReadBuff; + } + } + } else { + // Non-checksummed reads are much easier; we can just fill the buffer + // directly. + nRead = doByteBufferRead(buf); + if (nRead > 0) { + buf.position(buf.position() + nRead); + } + } + return nRead; + } + + /** + * Tries to read as many bytes as possible into supplied buffer, checksumming + * each chunk if needed. + * + * <b>Preconditions:</b> + * <ul> + * <li> + * If checksumming is enabled, buf.remaining must be a multiple of + * bytesPerChecksum. Note that this is not a requirement for clients of + * read(ByteBuffer) - in the case of non-checksum-sized read requests, + * read(ByteBuffer) will substitute a suitably sized buffer to pass to this + * method. + * </li> + * </ul> + * <b>Postconditions:</b> + * <ul> + * <li>buf.limit and buf.mark are unchanged.</li> + * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the + * requested bytes can be read straight from the buffer</li> + * </ul> + * + * @param buf + * byte buffer to write bytes to. If checksums are not required, buf + * can have any number of bytes remaining, otherwise there must be a + * multiple of the checksum chunk size remaining. + * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt> + * that is, the the number of useful bytes (up to the amount + * requested) readable from the buffer by the client. + */ + private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException { + if (verifyChecksum) { + assert buf.remaining() % bytesPerChecksum == 0; + } + int dataRead; + + int oldpos = buf.position(); + // Read as much as we can into the buffer. + dataRead = fillBuffer(dataIn, buf); + + if (dataRead == -1) { + return -1; + } + + if (verifyChecksum) { + ByteBuffer toChecksum = buf.duplicate(); + toChecksum.position(oldpos); + toChecksum.limit(oldpos + dataRead); + + checksumBuff.clear(); + // Equivalent to + // (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum ); + int numChunks = + (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuff.limit(checksumSize * numChunks); + + fillBuffer(checksumIn, checksumBuff); + checksumBuff.flip(); + + checksum.verifyChunkedSums(toChecksum, checksumBuff, filename, + this.startOffset); + } + + if (dataRead >= 0) { + buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead)); + } + + if (dataRead < offsetFromChunkBoundary) { + // yikes, didn't even get enough bytes to honour offset. This can happen + // even if we are verifying checksums if we are at EOF. + offsetFromChunkBoundary -= dataRead; + dataRead = 0; + } else { + dataRead -= offsetFromChunkBoundary; + offsetFromChunkBoundary = 0; + } + + return dataRead; + } + + /** + * Ensures that up to len bytes are available and checksummed in the slow read + * buffer. The number of bytes available to read is returned. If the buffer is + * not already empty, the number of remaining bytes is returned and no actual + * read happens. + * + * @param len + * the maximum number of bytes to make available. After len bytes + * are read, the underlying bytestream <b>must</b> be at a checksum + * boundary, or EOF. That is, (len + currentPosition) % + * bytesPerChecksum == 0. + * @return the number of bytes available to read, or -1 if EOF. + */ + private synchronized int fillSlowReadBuffer(int len) throws IOException { + int nRead; + if (slowReadBuff.hasRemaining()) { + // Already got data, good to go. + nRead = Math.min(len, slowReadBuff.remaining()); + } else { + // Round a complete read of len bytes (plus any implicit offset) to the + // next chunk boundary, since we try and read in multiples of a chunk + int nextChunk = len + offsetFromChunkBoundary + + (bytesPerChecksum - + ((len + offsetFromChunkBoundary) % bytesPerChecksum)); + int limit = Math.min(nextChunk, slowReadBuff.capacity()); + assert limit % bytesPerChecksum == 0; + + slowReadBuff.clear(); + slowReadBuff.limit(limit); + + nRead = doByteBufferRead(slowReadBuff); + + if (nRead > 0) { + // So that next time we call slowReadBuff.hasRemaining(), we don't get a + // false positive. + slowReadBuff.limit(nRead + slowReadBuff.position()); + } + } + return nRead; + } + + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + LOG.trace("read off {} len {}", off, len); + if (!verifyChecksum) { + return dataIn.read(buf, off, len); + } + + int nRead = fillSlowReadBuffer(slowReadBuff.capacity()); + + if (nRead > 0) { + // Possible that buffer is filled with a larger read than we need, since + // we tried to read as much as possible at once + nRead = Math.min(len, nRead); + slowReadBuff.get(buf, off, nRead); + } + + return nRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + LOG.debug("skip {}", n); + if (n <= 0) { + return 0; + } + if (!verifyChecksum) { + return dataIn.skip(n); + } + + // caller made sure newPosition is not beyond EOF. + int remaining = slowReadBuff.remaining(); + int position = slowReadBuff.position(); + int newPosition = position + (int)n; + + // if the new offset is already read into dataBuff, just reposition + if (n <= remaining) { + assert offsetFromChunkBoundary == 0; + slowReadBuff.position(newPosition); + return n; + } + + // for small gap, read through to keep the data/checksum in sync + if (n - remaining <= bytesPerChecksum) { + slowReadBuff.position(position + remaining); + if (skipBuf == null) { + skipBuf = new byte[bytesPerChecksum]; + } + int ret = read(skipBuf, 0, (int)(n - remaining)); + return (remaining + ret); + } + + // optimize for big gap: discard the current buffer, skip to + // the beginning of the appropriate checksum chunk and then + // read to the middle of that chunk to be in sync with checksums. + + // We can't use this.offsetFromChunkBoundary because we need to know how + // many bytes of the offset were really read. Calling read(..) with a + // positive this.offsetFromChunkBoundary causes that many bytes to get + // silently skipped. + int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum; + long toskip = n - remaining - myOffsetFromChunkBoundary; + + slowReadBuff.position(slowReadBuff.limit()); + checksumBuff.position(checksumBuff.limit()); + + IOUtils.skipFully(dataIn, toskip); + long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize; + IOUtils.skipFully(checksumIn, checkSumOffset); + + // read into the middle of the chunk + if (skipBuf == null) { + skipBuf = new byte[bytesPerChecksum]; + } + assert skipBuf.length == bytesPerChecksum; + assert myOffsetFromChunkBoundary < bytesPerChecksum; + + int ret = read(skipBuf, 0, myOffsetFromChunkBoundary); + + if (ret == -1) { // EOS + return (toskip + remaining); + } else { + return (toskip + remaining + ret); + } + } + + @Override + public synchronized void close() throws IOException { + IOUtilsClient.cleanup(LOG, dataIn, checksumIn); + if (slowReadBuff != null) { + bufferPool.returnBuffer(slowReadBuff); + slowReadBuff = null; + } + if (checksumBuff != null) { + bufferPool.returnBuffer(checksumBuff); + checksumBuff = null; + } + startOffset = -1; + checksum = null; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); + } + + @Override + public int available() { + // We never do network I/O in BlockReaderLocalLegacy. + return Integer.MAX_VALUE; + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public boolean isShortCircuit() { + return true; + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java new file mode 100644 index 0000000..b008654 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java @@ -0,0 +1,513 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FSInputChecker; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.PeerCache; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @deprecated this is an old implementation that is being left around + * in case any issues spring up with the new {@link BlockReaderRemote2} + * implementation. + * It will be removed in the next release. + */ [email protected] +@Deprecated +public class BlockReaderRemote extends FSInputChecker implements BlockReader { + static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class); + + private final Peer peer; + private final DatanodeID datanodeID; + private final DataInputStream in; + private DataChecksum checksum; + + /** offset in block of the last chunk received */ + private long lastChunkOffset = -1; + private long lastChunkLen = -1; + private long lastSeqNo = -1; + + /** offset in block where reader wants to actually read */ + private long startOffset; + + private final long blockId; + + /** offset in block of of first chunk - may be less than startOffset + if startOffset is not chunk-aligned */ + private final long firstChunkOffset; + + private final int bytesPerChecksum; + private final int checksumSize; + + /** + * The total number of bytes we need to transfer from the DN. + * This is the amount that the user has requested plus some padding + * at the beginning so that the read can begin on a chunk boundary. + */ + private final long bytesNeededToFinish; + + /** + * True if we are reading from a local DataNode. + */ + private final boolean isLocal; + + private boolean eos = false; + private boolean sentStatusCode = false; + + ByteBuffer checksumBytes = null; + /** Amount of unread data in the current received packet */ + int dataLeft = 0; + + private final PeerCache peerCache; + + private final Tracer tracer; + + /* FSInputChecker interface */ + + /* same interface as inputStream java.io.InputStream#read() + * used by DFSInputStream#read() + * This violates one rule when there is a checksum error: + * "Read should not modify user buffer before successful read" + * because it first reads the data to user buffer and then checks + * the checksum. + */ + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + + // This has to be set here, *before* the skip, since we can + // hit EOS during the skip, in the case that our entire read + // is smaller than the checksum chunk. + boolean eosBefore = eos; + + //for the first read, skip the extra bytes at the front. + if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { + // Skip these bytes. But don't call this.skip()! + int toSkip = (int)(startOffset - firstChunkOffset); + if ( super.readAndDiscard(toSkip) != toSkip ) { + // should never happen + throw new IOException("Could not skip required number of bytes"); + } + } + + int nRead = super.read(buf, off, len); + + // if eos was set in the previous read, send a status code to the DN + if (eos && !eosBefore && nRead >= 0) { + if (needChecksum()) { + sendReadResult(peer, Status.CHECKSUM_OK); + } else { + sendReadResult(peer, Status.SUCCESS); + } + } + return nRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + long nSkipped = 0; + while (nSkipped < n) { + int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE); + int ret = readAndDiscard(toSkip); + if (ret <= 0) { + return nSkipped; + } + nSkipped += ret; + } + return nSkipped; + } + + @Override + public int read() throws IOException { + throw new IOException("read() is not expected to be invoked. " + + "Use read(buf, off, len) instead."); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + /* Checksum errors are handled outside the BlockReader. + * DFSInputStream does not always call 'seekToNewSource'. In the + * case of pread(), it just tries a different replica without seeking. + */ + return false; + } + + @Override + public void seek(long pos) throws IOException { + throw new IOException("Seek() is not supported in BlockInputChecker"); + } + + @Override + protected long getChunkPosition(long pos) { + throw new RuntimeException("getChunkPosition() is not supported, " + + "since seek is not required"); + } + + /** + * Makes sure that checksumBytes has enough capacity + * and limit is set to the number of checksum bytes needed + * to be read. + */ + private void adjustChecksumBytes(int dataLen) { + int requiredSize = + ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; + if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { + checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); + } else { + checksumBytes.clear(); + } + checksumBytes.limit(requiredSize); + } + + @Override + protected synchronized int readChunk(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { + try (TraceScope ignored = tracer.newScope( + "BlockReaderRemote#readChunk(" + blockId + ")")) { + return readChunkImpl(pos, buf, offset, len, checksumBuf); + } + } + + private synchronized int readChunkImpl(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { + // Read one chunk. + if (eos) { + // Already hit EOF + return -1; + } + + // Read one DATA_CHUNK. + long chunkOffset = lastChunkOffset; + if ( lastChunkLen > 0 ) { + chunkOffset += lastChunkLen; + } + + // pos is relative to the start of the first chunk of the read. + // chunkOffset is relative to the start of the block. + // This makes sure that the read passed from FSInputChecker is the + // for the same chunk we expect to be reading from the DN. + if ( (pos + firstChunkOffset) != chunkOffset ) { + throw new IOException("Mismatch in pos : " + pos + " + " + + firstChunkOffset + " != " + chunkOffset); + } + + // Read next packet if the previous packet has been read completely. + if (dataLeft <= 0) { + //Read packet headers. + PacketHeader header = new PacketHeader(); + header.readFields(in); + + LOG.debug("DFSClient readChunk got header {}", header); + + // Sanity check the lengths + if (!header.sanityCheck(lastSeqNo)) { + throw new IOException("BlockReader: error in packet header " + + header); + } + + lastSeqNo = header.getSeqno(); + dataLeft = header.getDataLen(); + adjustChecksumBytes(header.getDataLen()); + if (header.getDataLen() > 0) { + IOUtils.readFully(in, checksumBytes.array(), 0, + checksumBytes.limit()); + } + } + + // Sanity checks + assert len >= bytesPerChecksum; + assert checksum != null; + assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0); + + + int checksumsToRead, bytesToRead; + + if (checksumSize > 0) { + + // How many chunks left in our packet - this is a ceiling + // since we may have a partial chunk at the end of the file + int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; + + // How many chunks we can fit in databuffer + // - note this is a floor since we always read full chunks + int chunksCanFit = Math.min(len / bytesPerChecksum, + checksumBuf.length / checksumSize); + + // How many chunks should we read + checksumsToRead = Math.min(chunksLeft, chunksCanFit); + // How many bytes should we actually read + bytesToRead = Math.min( + checksumsToRead * bytesPerChecksum, // full chunks + dataLeft); // in case we have a partial + } else { + // no checksum + bytesToRead = Math.min(dataLeft, len); + checksumsToRead = 0; + } + + if ( bytesToRead > 0 ) { + // Assert we have enough space + assert bytesToRead <= len; + assert checksumBytes.remaining() >= checksumSize * checksumsToRead; + assert checksumBuf.length >= checksumSize * checksumsToRead; + IOUtils.readFully(in, buf, offset, bytesToRead); + checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead); + } + + dataLeft -= bytesToRead; + assert dataLeft >= 0; + + lastChunkOffset = chunkOffset; + lastChunkLen = bytesToRead; + + // If there's no data left in the current packet after satisfying + // this read, and we have satisfied the client read, we expect + // an empty packet header from the DN to signify this. + // Note that pos + bytesToRead may in fact be greater since the + // DN finishes off the entire last chunk. + if (dataLeft == 0 && + pos + bytesToRead >= bytesNeededToFinish) { + + // Read header + PacketHeader hdr = new PacketHeader(); + hdr.readFields(in); + + if (!hdr.isLastPacketInBlock() || + hdr.getDataLen() != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + hdr); + } + + eos = true; + } + + if ( bytesToRead == 0 ) { + return -1; + } + + return bytesToRead; + } + + private BlockReaderRemote(String file, String bpid, long blockId, + DataInputStream in, DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, + DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) { + // Path is used only for printing block and file information in debug + super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + + ":" + bpid + ":of:"+ file)/*too non path-like?*/, + 1, verifyChecksum, + checksum.getChecksumSize() > 0? checksum : null, + checksum.getBytesPerChecksum(), + checksum.getChecksumSize()); + + this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. + createSocketAddr(datanodeID.getXferAddr())); + + this.peer = peer; + this.datanodeID = datanodeID; + this.in = in; + this.checksum = checksum; + this.startOffset = Math.max( startOffset, 0 ); + this.blockId = blockId; + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + + this.firstChunkOffset = firstChunkOffset; + lastChunkOffset = firstChunkOffset; + lastChunkLen = -1; + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + this.peerCache = peerCache; + this.tracer = tracer; + } + + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param bufferSize The IO buffer size (not the client buffer size) + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @return New BlockReader instance, or null on error. + */ + public static BlockReaderRemote newBlockReader(String file, + ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken, + long startOffset, long len, + int bufferSize, boolean verifyChecksum, + String clientName, Peer peer, + DatanodeID datanodeID, + PeerCache peerCache, + CachingStrategy cachingStrategy, + Tracer tracer) + throws IOException { + // in and out will be closed when sock is closed (by the caller) + final DataOutputStream out = + new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, + verifyChecksum, cachingStrategy); + + // + // Get bytes in block, set streams + // + + DataInputStream in = new DataInputStream( + new BufferedInputStream(peer.getInputStream(), bufferSize)); + + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + BlockReaderRemote2.checkSuccess(status, peer, block, file); + ReadOpChecksumInfoProto checksumInfo = + status.getReadOpChecksumInfo(); + DataChecksum checksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); + //Warning when we get CHECKSUM_NULL? + + // Read the first chunk offset. + long firstChunkOffset = checksumInfo.getChunkOffset(); + + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || + firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { + throw new IOException("BlockReader: error in first chunk offset (" + + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); + } + + return new BlockReaderRemote(file, block.getBlockPoolId(), block.getBlockId(), + in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, + peer, datanodeID, peerCache, tracer); + } + + @Override + public synchronized void close() throws IOException { + startOffset = -1; + checksum = null; + if (peerCache != null & sentStatusCode) { + peerCache.put(datanodeID, peer); + } else { + peer.close(); + } + + // in will be closed when its Socket is closed. + } + + @Override + public void readFully(byte[] buf, int readOffset, int amtToRead) + throws IOException { + IOUtils.readFully(this, buf, readOffset, amtToRead); + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return readFully(this, buf, offset, len); + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Peer peer, Status statusCode) { + assert !sentStatusCode : "already sent status code to " + peer; + try { + BlockReaderRemote2.writeReadResult(peer.getOutputStream(), statusCode); + sentStatusCode = true; + } catch (IOException e) { + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + peer.getRemoteAddressString() + ": " + e.getMessage()); + } + } + + @Override + public int read(ByteBuffer buf) throws IOException { + throw new UnsupportedOperationException("readDirect unsupported in BlockReaderRemote"); + } + + @Override + public int available() { + // An optimistic estimate of how much data is available + // to us without doing network I/O. + return BlockReaderRemote2.TCP_WINDOW_SIZE; + } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public boolean isShortCircuit() { + return false; + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java new file mode 100644 index 0000000..4f2aaab --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java @@ -0,0 +1,473 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.EnumSet; +import java.util.UUID; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.PeerCache; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.htrace.core.TraceScope; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.htrace.core.Tracer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a wrapper around connection to datanode + * and understands checksum, offset etc. + * + * Terminology: + * <dl> + * <dt>block</dt> + * <dd>The hdfs block, typically large (~64MB). + * </dd> + * <dt>chunk</dt> + * <dd>A block is divided into chunks, each comes with a checksum. + * We want transfers to be chunk-aligned, to be able to + * verify checksums. + * </dd> + * <dt>packet</dt> + * <dd>A grouping of chunks used for transport. It contains a + * header, followed by checksum data, followed by real data. + * </dd> + * </dl> + * Please see DataNode for the RPC specification. + * + * This is a new implementation introduced in Hadoop 0.23 which + * is more efficient and simpler than the older BlockReader + * implementation. It should be renamed to BlockReaderRemote + * once we are confident in it. + */ [email protected] +public class BlockReaderRemote2 implements BlockReader { + + static final Logger LOG = LoggerFactory.getLogger(BlockReaderRemote2.class); + static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB; + + final private Peer peer; + final private DatanodeID datanodeID; + final private PeerCache peerCache; + final private long blockId; + private final ReadableByteChannel in; + + private DataChecksum checksum; + private final PacketReceiver packetReceiver = new PacketReceiver(true); + + private ByteBuffer curDataSlice = null; + + /** offset in block of the last chunk received */ + private long lastSeqNo = -1; + + /** offset in block where reader wants to actually read */ + private long startOffset; + private final String filename; + + private final int bytesPerChecksum; + private final int checksumSize; + + /** + * The total number of bytes we need to transfer from the DN. + * This is the amount that the user has requested plus some padding + * at the beginning so that the read can begin on a chunk boundary. + */ + private long bytesNeededToFinish; + + /** + * True if we are reading from a local DataNode. + */ + private final boolean isLocal; + + private final boolean verifyChecksum; + + private boolean sentStatusCode = false; + + private final Tracer tracer; + + @VisibleForTesting + public Peer getPeer() { + return peer; + } + + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null); + LOG.trace("Starting read #{} file {} from datanode {}", + randomId, filename, datanodeID.getHostName()); + + if (curDataSlice == null || + curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + try (TraceScope ignored = tracer.newScope( + "BlockReaderRemote2#readNextPacket(" + blockId + ")")) { + readNextPacket(); + } + } + + LOG.trace("Finishing read #{}", randomId); + + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; + } + + int nRead = Math.min(curDataSlice.remaining(), len); + curDataSlice.get(buf, off, nRead); + + return nRead; + } + + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + if (curDataSlice == null || + (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) { + try (TraceScope ignored = tracer.newScope( + "BlockReaderRemote2#readNextPacket(" + blockId + ")")) { + readNextPacket(); + } + } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; + } + + int nRead = Math.min(curDataSlice.remaining(), buf.remaining()); + ByteBuffer writeSlice = curDataSlice.duplicate(); + writeSlice.limit(writeSlice.position() + nRead); + buf.put(writeSlice); + curDataSlice.position(writeSlice.position()); + + return nRead; + } + + private void readNextPacket() throws IOException { + //Read packet headers. + packetReceiver.receiveNextPacket(in); + + PacketHeader curHeader = packetReceiver.getHeader(); + curDataSlice = packetReceiver.getDataSlice(); + assert curDataSlice.capacity() == curHeader.getDataLen(); + + LOG.trace("DFSClient readNextPacket got header {}", curHeader); + + // Sanity check the lengths + if (!curHeader.sanityCheck(lastSeqNo)) { + throw new IOException("BlockReader: error in packet header " + + curHeader); + } + + if (curHeader.getDataLen() > 0) { + int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; + int checksumsLen = chunks * checksumSize; + + assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : + "checksum slice capacity=" + + packetReceiver.getChecksumSlice().capacity() + + " checksumsLen=" + checksumsLen; + + lastSeqNo = curHeader.getSeqno(); + if (verifyChecksum && curDataSlice.remaining() > 0) { + // N.B.: the checksum error offset reported here is actually + // relative to the start of the block, not the start of the file. + // This is slightly misleading, but preserves the behavior from + // the older BlockReader. + checksum.verifyChunkedSums(curDataSlice, + packetReceiver.getChecksumSlice(), + filename, curHeader.getOffsetInBlock()); + } + bytesNeededToFinish -= curHeader.getDataLen(); + } + + // First packet will include some data prior to the first byte + // the user requested. Skip it. + if (curHeader.getOffsetInBlock() < startOffset) { + int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); + curDataSlice.position(newPos); + } + + // If we've now satisfied the whole client read, read one last packet + // header, which should be empty + if (bytesNeededToFinish <= 0) { + readTrailingEmptyPacket(); + if (verifyChecksum) { + sendReadResult(Status.CHECKSUM_OK); + } else { + sendReadResult(Status.SUCCESS); + } + } + } + + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + long skipped = 0; + while (skipped < n) { + long needToSkip = n - skipped; + if (curDataSlice == null || + curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + readNextPacket(); + } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + break; + } + + int skip = (int)Math.min(curDataSlice.remaining(), needToSkip); + curDataSlice.position(curDataSlice.position() + skip); + skipped += skip; + } + return skipped; + } + + private void readTrailingEmptyPacket() throws IOException { + LOG.trace("Reading empty packet at end of read"); + + packetReceiver.receiveNextPacket(in); + + PacketHeader trailer = packetReceiver.getHeader(); + if (!trailer.isLastPacketInBlock() || + trailer.getDataLen() != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + trailer); + } + } + + protected BlockReaderRemote2(String file, long blockId, + DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, + DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) { + this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. + createSocketAddr(datanodeID.getXferAddr())); + // Path is used only for printing block and file information in debug + this.peer = peer; + this.datanodeID = datanodeID; + this.in = peer.getInputStreamChannel(); + this.checksum = checksum; + this.verifyChecksum = verifyChecksum; + this.startOffset = Math.max( startOffset, 0 ); + this.filename = file; + this.peerCache = peerCache; + this.blockId = blockId; + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + this.tracer = tracer; + } + + + @Override + public synchronized void close() throws IOException { + packetReceiver.close(); + startOffset = -1; + checksum = null; + if (peerCache != null && sentStatusCode) { + peerCache.put(datanodeID, peer); + } else { + peer.close(); + } + + // in will be closed when its Socket is closed. + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Status statusCode) { + assert !sentStatusCode : "already sent status code to " + peer; + try { + writeReadResult(peer.getOutputStream(), statusCode); + sentStatusCode = true; + } catch (IOException e) { + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + peer.getRemoteAddressString() + ": " + e.getMessage()); + } + } + + /** + * Serialize the actual read result on the wire. + */ + static void writeReadResult(OutputStream out, Status statusCode) + throws IOException { + + ClientReadStatusProto.newBuilder() + .setStatus(statusCode) + .build() + .writeDelimitedTo(out); + + out.flush(); + } + + /** + * File name to print when accessing a block directly (from servlets) + * @param s Address of the block location + * @param poolId Block pool ID of the block + * @param blockId Block ID of the block + * @return string that has a file name for debug purposes + */ + public static String getFileName(final InetSocketAddress s, + final String poolId, final long blockId) { + return s.toString() + ":" + poolId + ":" + blockId; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); + } + + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @param peer The Peer to use + * @param datanodeID The DatanodeID this peer is connected to + * @return New BlockReader instance, or null on error. + */ + public static BlockReader newBlockReader(String file, + ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken, + long startOffset, long len, + boolean verifyChecksum, + String clientName, + Peer peer, DatanodeID datanodeID, + PeerCache peerCache, + CachingStrategy cachingStrategy, + Tracer tracer) throws IOException { + // in and out will be closed when sock is closed (by the caller) + final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( + peer.getOutputStream())); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, + verifyChecksum, cachingStrategy); + + // + // Get bytes in block + // + DataInputStream in = new DataInputStream(peer.getInputStream()); + + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + checkSuccess(status, peer, block, file); + ReadOpChecksumInfoProto checksumInfo = + status.getReadOpChecksumInfo(); + DataChecksum checksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); + //Warning when we get CHECKSUM_NULL? + + // Read the first chunk offset. + long firstChunkOffset = checksumInfo.getChunkOffset(); + + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || + firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { + throw new IOException("BlockReader: error in first chunk offset (" + + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); + } + + return new BlockReaderRemote2(file, block.getBlockId(), checksum, + verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, + peerCache, tracer); + } + + static void checkSuccess( + BlockOpResponseProto status, Peer peer, + ExtendedBlock block, String file) + throws IOException { + String logInfo = "for OP_READ_BLOCK" + + ", self=" + peer.getLocalAddressString() + + ", remote=" + peer.getRemoteAddressString() + + ", for file " + file + + ", for pool " + block.getBlockPoolId() + + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); + DataTransferProtoUtil.checkBlockOpStatus(status, logInfo); + } + + @Override + public int available() { + // An optimistic estimate of how much data is available + // to us without doing network I/O. + return TCP_WINDOW_SIZE; + } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public boolean isShortCircuit() { + return false; + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + return null; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
