http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java deleted file mode 100644 index 65a8373..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ /dev/null @@ -1,744 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; -import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; -import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.hdfs.util.IOUtilsClient; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.DirectBufferPool; -import org.apache.htrace.core.TraceScope; -import org.apache.htrace.core.Tracer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client - * is on the same machine as the datanode, then the client can read files - * directly from the local file system rather than going through the datanode - * for better performance. <br> - * - * This is the legacy implementation based on HDFS-2246, which requires - * permissions on the datanode to be set so that clients can directly access the - * blocks. The new implementation based on HDFS-347 should be preferred on UNIX - * systems where the required native code has been implemented.<br> - * - * {@link BlockReaderLocalLegacy} works as follows: - * <ul> - * <li>The client performing short circuit reads must be configured at the - * datanode.</li> - * <li>The client gets the path to the file where block is stored using - * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)} - * RPC call</li> - * <li>Client uses kerberos authentication to connect to the datanode over RPC, - * if security is enabled.</li> - * </ul> - */ [email protected] -class BlockReaderLocalLegacy implements BlockReader { - private static final Logger LOG = LoggerFactory.getLogger( - BlockReaderLocalLegacy.class); - - //Stores the cache and proxy for a local datanode. - private static class LocalDatanodeInfo { - private ClientDatanodeProtocol proxy = null; - private final Map<ExtendedBlock, BlockLocalPathInfo> cache; - - LocalDatanodeInfo() { - final int cacheSize = 10000; - final float hashTableLoadFactor = 0.75f; - int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) - + 1; - cache = Collections - .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>( - hashTableCapacity, hashTableLoadFactor, true) { - private static final long serialVersionUID = 1; - - @Override - protected boolean removeEldestEntry( - Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) { - return size() > cacheSize; - } - }); - } - - private synchronized ClientDatanodeProtocol getDatanodeProxy( - UserGroupInformation ugi, final DatanodeInfo node, - final Configuration conf, final int socketTimeout, - final boolean connectToDnViaHostname) throws IOException { - if (proxy == null) { - try { - proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() { - @Override - public ClientDatanodeProtocol run() throws Exception { - return DFSUtilClient.createClientDatanodeProtocolProxy(node, conf, - socketTimeout, connectToDnViaHostname); - } - }); - } catch (InterruptedException e) { - LOG.warn("encountered exception ", e); - } - } - return proxy; - } - - private synchronized void resetDatanodeProxy() { - if (null != proxy) { - RPC.stopProxy(proxy); - proxy = null; - } - } - - private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { - return cache.get(b); - } - - private void setBlockLocalPathInfo(ExtendedBlock b, - BlockLocalPathInfo info) { - cache.put(b, info); - } - - private void removeBlockLocalPathInfo(ExtendedBlock b) { - cache.remove(b); - } - } - - // Multiple datanodes could be running on the local machine. Store proxies in - // a map keyed by the ipc port of the datanode. - private static final Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = - new HashMap<>(); - - private final FileInputStream dataIn; // reader for the data file - private final FileInputStream checksumIn; // reader for the checksum file - - /** - * Offset from the most recent chunk boundary at which the next read should - * take place. Is only set to non-zero at construction time, and is - * decremented (usually to 0) by subsequent reads. This avoids having to do a - * checksum read at construction to position the read cursor correctly. - */ - private int offsetFromChunkBoundary; - - private byte[] skipBuf = null; - - /** - * Used for checksummed reads that need to be staged before copying to their - * output buffer because they are either a) smaller than the checksum chunk - * size or b) issued by the slower read(byte[]...) path - */ - private ByteBuffer slowReadBuff = null; - private ByteBuffer checksumBuff = null; - private DataChecksum checksum; - private final boolean verifyChecksum; - - private static final DirectBufferPool bufferPool = new DirectBufferPool(); - - private final int bytesPerChecksum; - private final int checksumSize; - - /** offset in block where reader wants to actually read */ - private long startOffset; - private final String filename; - private long blockId; - private final Tracer tracer; - - /** - * The only way this object can be instantiated. - */ - static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf, - UserGroupInformation userGroupInformation, - Configuration configuration, String file, ExtendedBlock blk, - Token<BlockTokenIdentifier> token, DatanodeInfo node, - long startOffset, long length, StorageType storageType, - Tracer tracer) throws IOException { - final ShortCircuitConf scConf = conf.getShortCircuitConf(); - LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node - .getIpcPort()); - // check the cache first - BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); - if (pathinfo == null) { - if (userGroupInformation == null) { - userGroupInformation = UserGroupInformation.getCurrentUser(); - } - pathinfo = getBlockPathInfo(userGroupInformation, blk, node, - configuration, conf.getSocketTimeout(), token, - conf.isConnectToDnViaHostname(), storageType); - } - - // check to see if the file exists. It may so happen that the - // HDFS file has been deleted and this block-lookup is occurring - // on behalf of a new HDFS file. This time, the block file could - // be residing in a different portion of the fs.data.dir directory. - // In this case, we remove this entry from the cache. The next - // call to this method will re-populate the cache. - FileInputStream dataIn = null; - FileInputStream checksumIn = null; - BlockReaderLocalLegacy localBlockReader = null; - final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums() - || storageType.isTransient(); - try { - // get a local file system - File blkfile = new File(pathinfo.getBlockPath()); - dataIn = new FileInputStream(blkfile); - - LOG.debug("New BlockReaderLocalLegacy for file {} of size {} startOffset " - + "{} length {} short circuit checksum {}", - blkfile, blkfile.length(), startOffset, length, !skipChecksumCheck); - - if (!skipChecksumCheck) { - // get the metadata file - File metafile = new File(pathinfo.getMetaPath()); - checksumIn = new FileInputStream(metafile); - - final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( - new DataInputStream(checksumIn), blk); - long firstChunkOffset = startOffset - - (startOffset % checksum.getBytesPerChecksum()); - localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, - startOffset, checksum, true, dataIn, firstChunkOffset, checksumIn, - tracer); - } else { - localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, - startOffset, dataIn, tracer); - } - } catch (IOException e) { - // remove from cache - localDatanodeInfo.removeBlockLocalPathInfo(blk); - LOG.warn("BlockReaderLocalLegacy: Removing " + blk - + " from cache because local file " + pathinfo.getBlockPath() - + " could not be opened."); - throw e; - } finally { - if (localBlockReader == null) { - if (dataIn != null) { - dataIn.close(); - } - if (checksumIn != null) { - checksumIn.close(); - } - } - } - return localBlockReader; - } - - private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) { - LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port); - if (ldInfo == null) { - ldInfo = new LocalDatanodeInfo(); - localDatanodeInfoMap.put(port, ldInfo); - } - return ldInfo; - } - - private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi, - ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout, - Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname, - StorageType storageType) throws IOException { - LocalDatanodeInfo localDatanodeInfo = - getLocalDatanodeInfo(node.getIpcPort()); - BlockLocalPathInfo pathinfo; - ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node, - conf, timeout, connectToDnViaHostname); - try { - // make RPC to local datanode to find local pathnames of blocks - pathinfo = proxy.getBlockLocalPathInfo(blk, token); - // We can't cache the path information for a replica on transient storage. - // If the replica gets evicted, then it moves to a different path. Then, - // our next attempt to read from the cached path would fail to find the - // file. Additionally, the failure would cause us to disable legacy - // short-circuit read for all subsequent use in the ClientContext. Unlike - // the newer short-circuit read implementation, we have no communication - // channel for the DataNode to notify the client that the path has been - // invalidated. Therefore, our only option is to skip caching. - if (pathinfo != null && !storageType.isTransient()) { - LOG.debug("Cached location of block {} as {}", blk, pathinfo); - localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo); - } - } catch (IOException e) { - localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error - throw e; - } - return pathinfo; - } - - private static int getSlowReadBufferNumChunks(int bufferSizeBytes, - int bytesPerChecksum) { - if (bufferSizeBytes < bytesPerChecksum) { - throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " + - "buffer size (" + bufferSizeBytes + ") is not large enough to hold " + - "a single chunk (" + bytesPerChecksum + "). Please configure " + - HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY + - " appropriately"); - } - - // Round down to nearest chunk size - return bufferSizeBytes / bytesPerChecksum; - } - - private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, - ExtendedBlock block, long startOffset, FileInputStream dataIn, - Tracer tracer) throws IOException { - this(conf, hdfsfile, block, startOffset, - DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false, - dataIn, startOffset, null, tracer); - } - - private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, - ExtendedBlock block, long startOffset, DataChecksum checksum, - boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, - FileInputStream checksumIn, Tracer tracer) throws IOException { - this.filename = hdfsfile; - this.checksum = checksum; - this.verifyChecksum = verifyChecksum; - this.startOffset = Math.max(startOffset, 0); - this.blockId = block.getBlockId(); - - bytesPerChecksum = this.checksum.getBytesPerChecksum(); - checksumSize = this.checksum.getChecksumSize(); - - this.dataIn = dataIn; - this.checksumIn = checksumIn; - this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); - - final int chunksPerChecksumRead = getSlowReadBufferNumChunks( - conf.getShortCircuitBufferSize(), bytesPerChecksum); - slowReadBuff = bufferPool.getBuffer( - bytesPerChecksum * chunksPerChecksumRead); - checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); - // Initially the buffers have nothing to read. - slowReadBuff.flip(); - checksumBuff.flip(); - boolean success = false; - try { - // Skip both input streams to beginning of the chunk containing - // startOffset - IOUtils.skipFully(dataIn, firstChunkOffset); - if (checksumIn != null) { - long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * - checksumSize; - IOUtils.skipFully(checksumIn, checkSumOffset); - } - success = true; - } finally { - if (!success) { - bufferPool.returnBuffer(slowReadBuff); - bufferPool.returnBuffer(checksumBuff); - } - } - this.tracer = tracer; - } - - /** - * Reads bytes into a buffer until EOF or the buffer's limit is reached - */ - private int fillBuffer(FileInputStream stream, ByteBuffer buf) - throws IOException { - try (TraceScope ignored = tracer. - newScope("BlockReaderLocalLegacy#fillBuffer(" + blockId + ")")) { - int bytesRead = stream.getChannel().read(buf); - if (bytesRead < 0) { - //EOF - return bytesRead; - } - while (buf.remaining() > 0) { - int n = stream.getChannel().read(buf); - if (n < 0) { - //EOF - return bytesRead; - } - bytesRead += n; - } - return bytesRead; - } - } - - /** - * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into - * another. - */ - private void writeSlice(ByteBuffer from, ByteBuffer to, int length) { - int oldLimit = from.limit(); - from.limit(from.position() + length); - try { - to.put(from); - } finally { - from.limit(oldLimit); - } - } - - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - int nRead = 0; - if (verifyChecksum) { - // A 'direct' read actually has three phases. The first drains any - // remaining bytes from the slow read buffer. After this the read is - // guaranteed to be on a checksum chunk boundary. If there are still bytes - // to read, the fast direct path is used for as many remaining bytes as - // possible, up to a multiple of the checksum chunk size. Finally, any - // 'odd' bytes remaining at the end of the read cause another slow read to - // be issued, which involves an extra copy. - - // Every 'slow' read tries to fill the slow read buffer in one go for - // efficiency's sake. As described above, all non-checksum-chunk-aligned - // reads will be served from the slower read path. - - if (slowReadBuff.hasRemaining()) { - // There are remaining bytes from a small read available. This usually - // means this read is unaligned, which falls back to the slow path. - int fromSlowReadBuff = Math.min(buf.remaining(), - slowReadBuff.remaining()); - writeSlice(slowReadBuff, buf, fromSlowReadBuff); - nRead += fromSlowReadBuff; - } - - if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) { - // Since we have drained the 'small read' buffer, we are guaranteed to - // be chunk-aligned - int len = buf.remaining() - (buf.remaining() % bytesPerChecksum); - - // There's only enough checksum buffer space available to checksum one - // entire slow read buffer. This saves keeping the number of checksum - // chunks around. - len = Math.min(len, slowReadBuff.capacity()); - int oldlimit = buf.limit(); - buf.limit(buf.position() + len); - int readResult = 0; - try { - readResult = doByteBufferRead(buf); - } finally { - buf.limit(oldlimit); - } - if (readResult == -1) { - return nRead; - } else { - nRead += readResult; - buf.position(buf.position() + readResult); - } - } - - // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read - // until chunk boundary - if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || - offsetFromChunkBoundary > 0) { - int toRead = Math.min(buf.remaining(), - bytesPerChecksum - offsetFromChunkBoundary); - int readResult = fillSlowReadBuffer(toRead); - if (readResult == -1) { - return nRead; - } else { - int fromSlowReadBuff = Math.min(readResult, buf.remaining()); - writeSlice(slowReadBuff, buf, fromSlowReadBuff); - nRead += fromSlowReadBuff; - } - } - } else { - // Non-checksummed reads are much easier; we can just fill the buffer - // directly. - nRead = doByteBufferRead(buf); - if (nRead > 0) { - buf.position(buf.position() + nRead); - } - } - return nRead; - } - - /** - * Tries to read as many bytes as possible into supplied buffer, checksumming - * each chunk if needed. - * - * <b>Preconditions:</b> - * <ul> - * <li> - * If checksumming is enabled, buf.remaining must be a multiple of - * bytesPerChecksum. Note that this is not a requirement for clients of - * read(ByteBuffer) - in the case of non-checksum-sized read requests, - * read(ByteBuffer) will substitute a suitably sized buffer to pass to this - * method. - * </li> - * </ul> - * <b>Postconditions:</b> - * <ul> - * <li>buf.limit and buf.mark are unchanged.</li> - * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the - * requested bytes can be read straight from the buffer</li> - * </ul> - * - * @param buf - * byte buffer to write bytes to. If checksums are not required, buf - * can have any number of bytes remaining, otherwise there must be a - * multiple of the checksum chunk size remaining. - * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt> - * that is, the the number of useful bytes (up to the amount - * requested) readable from the buffer by the client. - */ - private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException { - if (verifyChecksum) { - assert buf.remaining() % bytesPerChecksum == 0; - } - int dataRead; - - int oldpos = buf.position(); - // Read as much as we can into the buffer. - dataRead = fillBuffer(dataIn, buf); - - if (dataRead == -1) { - return -1; - } - - if (verifyChecksum) { - ByteBuffer toChecksum = buf.duplicate(); - toChecksum.position(oldpos); - toChecksum.limit(oldpos + dataRead); - - checksumBuff.clear(); - // Equivalent to - // (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum ); - int numChunks = - (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum; - checksumBuff.limit(checksumSize * numChunks); - - fillBuffer(checksumIn, checksumBuff); - checksumBuff.flip(); - - checksum.verifyChunkedSums(toChecksum, checksumBuff, filename, - this.startOffset); - } - - if (dataRead >= 0) { - buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead)); - } - - if (dataRead < offsetFromChunkBoundary) { - // yikes, didn't even get enough bytes to honour offset. This can happen - // even if we are verifying checksums if we are at EOF. - offsetFromChunkBoundary -= dataRead; - dataRead = 0; - } else { - dataRead -= offsetFromChunkBoundary; - offsetFromChunkBoundary = 0; - } - - return dataRead; - } - - /** - * Ensures that up to len bytes are available and checksummed in the slow read - * buffer. The number of bytes available to read is returned. If the buffer is - * not already empty, the number of remaining bytes is returned and no actual - * read happens. - * - * @param len - * the maximum number of bytes to make available. After len bytes - * are read, the underlying bytestream <b>must</b> be at a checksum - * boundary, or EOF. That is, (len + currentPosition) % - * bytesPerChecksum == 0. - * @return the number of bytes available to read, or -1 if EOF. - */ - private synchronized int fillSlowReadBuffer(int len) throws IOException { - int nRead; - if (slowReadBuff.hasRemaining()) { - // Already got data, good to go. - nRead = Math.min(len, slowReadBuff.remaining()); - } else { - // Round a complete read of len bytes (plus any implicit offset) to the - // next chunk boundary, since we try and read in multiples of a chunk - int nextChunk = len + offsetFromChunkBoundary + - (bytesPerChecksum - - ((len + offsetFromChunkBoundary) % bytesPerChecksum)); - int limit = Math.min(nextChunk, slowReadBuff.capacity()); - assert limit % bytesPerChecksum == 0; - - slowReadBuff.clear(); - slowReadBuff.limit(limit); - - nRead = doByteBufferRead(slowReadBuff); - - if (nRead > 0) { - // So that next time we call slowReadBuff.hasRemaining(), we don't get a - // false positive. - slowReadBuff.limit(nRead + slowReadBuff.position()); - } - } - return nRead; - } - - @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - LOG.trace("read off {} len {}", off, len); - if (!verifyChecksum) { - return dataIn.read(buf, off, len); - } - - int nRead = fillSlowReadBuffer(slowReadBuff.capacity()); - - if (nRead > 0) { - // Possible that buffer is filled with a larger read than we need, since - // we tried to read as much as possible at once - nRead = Math.min(len, nRead); - slowReadBuff.get(buf, off, nRead); - } - - return nRead; - } - - @Override - public synchronized long skip(long n) throws IOException { - LOG.debug("skip {}", n); - if (n <= 0) { - return 0; - } - if (!verifyChecksum) { - return dataIn.skip(n); - } - - // caller made sure newPosition is not beyond EOF. - int remaining = slowReadBuff.remaining(); - int position = slowReadBuff.position(); - int newPosition = position + (int)n; - - // if the new offset is already read into dataBuff, just reposition - if (n <= remaining) { - assert offsetFromChunkBoundary == 0; - slowReadBuff.position(newPosition); - return n; - } - - // for small gap, read through to keep the data/checksum in sync - if (n - remaining <= bytesPerChecksum) { - slowReadBuff.position(position + remaining); - if (skipBuf == null) { - skipBuf = new byte[bytesPerChecksum]; - } - int ret = read(skipBuf, 0, (int)(n - remaining)); - return (remaining + ret); - } - - // optimize for big gap: discard the current buffer, skip to - // the beginning of the appropriate checksum chunk and then - // read to the middle of that chunk to be in sync with checksums. - - // We can't use this.offsetFromChunkBoundary because we need to know how - // many bytes of the offset were really read. Calling read(..) with a - // positive this.offsetFromChunkBoundary causes that many bytes to get - // silently skipped. - int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum; - long toskip = n - remaining - myOffsetFromChunkBoundary; - - slowReadBuff.position(slowReadBuff.limit()); - checksumBuff.position(checksumBuff.limit()); - - IOUtils.skipFully(dataIn, toskip); - long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize; - IOUtils.skipFully(checksumIn, checkSumOffset); - - // read into the middle of the chunk - if (skipBuf == null) { - skipBuf = new byte[bytesPerChecksum]; - } - assert skipBuf.length == bytesPerChecksum; - assert myOffsetFromChunkBoundary < bytesPerChecksum; - - int ret = read(skipBuf, 0, myOffsetFromChunkBoundary); - - if (ret == -1) { // EOS - return (toskip + remaining); - } else { - return (toskip + remaining + ret); - } - } - - @Override - public synchronized void close() throws IOException { - IOUtilsClient.cleanup(LOG, dataIn, checksumIn); - if (slowReadBuff != null) { - bufferPool.returnBuffer(slowReadBuff); - slowReadBuff = null; - } - if (checksumBuff != null) { - bufferPool.returnBuffer(checksumBuff); - checksumBuff = null; - } - startOffset = -1; - checksum = null; - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return BlockReaderUtil.readAll(this, buf, offset, len); - } - - @Override - public void readFully(byte[] buf, int off, int len) throws IOException { - BlockReaderUtil.readFully(this, buf, off, len); - } - - @Override - public int available() { - // We never do network I/O in BlockReaderLocalLegacy. - return Integer.MAX_VALUE; - } - - @Override - public boolean isShortCircuit() { - return true; - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - return null; - } - - @Override - public DataChecksum getDataChecksum() { - return checksum; - } - - @Override - public int getNetworkDistance() { - return 0; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java deleted file mode 100644 index 85f925f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.hadoop.classification.InterfaceAudience; - -import java.io.IOException; - -/** - * For sharing between the local and remote block reader implementations. - */ [email protected] -class BlockReaderUtil { - - /* See {@link BlockReader#readAll(byte[], int, int)} */ - public static int readAll(BlockReader reader, - byte[] buf, int offset, int len) throws IOException { - int n = 0; - for (;;) { - int nread = reader.read(buf, offset + n, len - n); - if (nread <= 0) - return (n == 0) ? nread : n; - n += nread; - if (n >= len) - return n; - } - } - - /* See {@link BlockReader#readFully(byte[], int, int)} */ - public static void readFully(BlockReader reader, - byte[] buf, int off, int len) throws IOException { - int toRead = len; - while (toRead > 0) { - int ret = reader.read(buf, off, toRead); - if (ret < 0) { - throw new IOException("Premature EOF from inputStream"); - } - toRead -= ret; - off += ret; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 9e67ff2..2ed0abd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; +import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java deleted file mode 100644 index 707a56a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.EnumSet; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.util.DataChecksum; - -/** - * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from - * replicas. - */ [email protected] -public final class ExternalBlockReader implements BlockReader { - private final ReplicaAccessor accessor; - private final long visibleLength; - private long pos; - - ExternalBlockReader(ReplicaAccessor accessor, long visibleLength, - long startOffset) { - this.accessor = accessor; - this.visibleLength = visibleLength; - this.pos = startOffset; - } - - @Override - public int read(byte[] buf, int off, int len) throws IOException { - int nread = accessor.read(pos, buf, off, len); - if (nread < 0) { - return nread; - } - pos += nread; - return nread; - } - - @Override - public int read(ByteBuffer buf) throws IOException { - int nread = accessor.read(pos, buf); - if (nread < 0) { - return nread; - } - pos += nread; - return nread; - } - - @Override - public long skip(long n) throws IOException { - // You cannot skip backwards - if (n <= 0) { - return 0; - } - // You can't skip past the last offset that we want to read with this - // block reader. - long oldPos = pos; - pos += n; - if (pos > visibleLength) { - pos = visibleLength; - } - return pos - oldPos; - } - - @Override - public int available() { - // We return the amount of bytes between the current offset and the visible - // length. Some of the other block readers return a shorter length than - // that. The only advantage to returning a shorter length is that the - // DFSInputStream will trash your block reader and create a new one if - // someone tries to seek() beyond the available() region. - long diff = visibleLength - pos; - if (diff > Integer.MAX_VALUE) { - return Integer.MAX_VALUE; - } else { - return (int)diff; - } - } - - @Override - public void close() throws IOException { - accessor.close(); - } - - @Override - public void readFully(byte[] buf, int offset, int len) throws IOException { - BlockReaderUtil.readFully(this, buf, offset, len); - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return BlockReaderUtil.readAll(this, buf, offset, len); - } - - @Override - public boolean isShortCircuit() { - return accessor.isShortCircuit(); - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - // For now, pluggable ReplicaAccessors do not support zero-copy. - return null; - } - - @Override - public DataChecksum getDataChecksum() { - return null; - } - - @Override - public int getNetworkDistance() { - return accessor.getNetworkDistance(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java deleted file mode 100644 index 7e094f5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ /dev/null @@ -1,510 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.EnumSet; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.FSInputChecker; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.core.TraceScope; -import org.apache.htrace.core.Tracer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * @deprecated this is an old implementation that is being left around - * in case any issues spring up with the new {@link RemoteBlockReader2} - * implementation. - * It will be removed in the next release. - */ [email protected] -@Deprecated -public class RemoteBlockReader extends FSInputChecker implements BlockReader { - static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class); - - private final Peer peer; - private final DatanodeID datanodeID; - private final DataInputStream in; - private DataChecksum checksum; - - /** offset in block of the last chunk received */ - private long lastChunkOffset = -1; - private long lastChunkLen = -1; - private long lastSeqNo = -1; - - /** offset in block where reader wants to actually read */ - private long startOffset; - - private final long blockId; - - /** offset in block of of first chunk - may be less than startOffset - if startOffset is not chunk-aligned */ - private final long firstChunkOffset; - - private final int bytesPerChecksum; - private final int checksumSize; - - /** - * The total number of bytes we need to transfer from the DN. - * This is the amount that the user has requested plus some padding - * at the beginning so that the read can begin on a chunk boundary. - */ - private final long bytesNeededToFinish; - - private boolean eos = false; - private boolean sentStatusCode = false; - - ByteBuffer checksumBytes = null; - /** Amount of unread data in the current received packet */ - int dataLeft = 0; - - private final PeerCache peerCache; - - private final Tracer tracer; - - private final int networkDistance; - - /* FSInputChecker interface */ - - /* same interface as inputStream java.io.InputStream#read() - * used by DFSInputStream#read() - * This violates one rule when there is a checksum error: - * "Read should not modify user buffer before successful read" - * because it first reads the data to user buffer and then checks - * the checksum. - */ - @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - - // This has to be set here, *before* the skip, since we can - // hit EOS during the skip, in the case that our entire read - // is smaller than the checksum chunk. - boolean eosBefore = eos; - - //for the first read, skip the extra bytes at the front. - if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { - // Skip these bytes. But don't call this.skip()! - int toSkip = (int)(startOffset - firstChunkOffset); - if ( super.readAndDiscard(toSkip) != toSkip ) { - // should never happen - throw new IOException("Could not skip required number of bytes"); - } - } - - int nRead = super.read(buf, off, len); - - // if eos was set in the previous read, send a status code to the DN - if (eos && !eosBefore && nRead >= 0) { - if (needChecksum()) { - sendReadResult(peer, Status.CHECKSUM_OK); - } else { - sendReadResult(peer, Status.SUCCESS); - } - } - return nRead; - } - - @Override - public synchronized long skip(long n) throws IOException { - /* How can we make sure we don't throw a ChecksumException, at least - * in majority of the cases?. This one throws. */ - long nSkipped = 0; - while (nSkipped < n) { - int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE); - int ret = readAndDiscard(toSkip); - if (ret <= 0) { - return nSkipped; - } - nSkipped += ret; - } - return nSkipped; - } - - @Override - public int read() throws IOException { - throw new IOException("read() is not expected to be invoked. " + - "Use read(buf, off, len) instead."); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - /* Checksum errors are handled outside the BlockReader. - * DFSInputStream does not always call 'seekToNewSource'. In the - * case of pread(), it just tries a different replica without seeking. - */ - return false; - } - - @Override - public void seek(long pos) throws IOException { - throw new IOException("Seek() is not supported in BlockInputChecker"); - } - - @Override - protected long getChunkPosition(long pos) { - throw new RuntimeException("getChunkPosition() is not supported, " + - "since seek is not required"); - } - - /** - * Makes sure that checksumBytes has enough capacity - * and limit is set to the number of checksum bytes needed - * to be read. - */ - private void adjustChecksumBytes(int dataLen) { - int requiredSize = - ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; - if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { - checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); - } else { - checksumBytes.clear(); - } - checksumBytes.limit(requiredSize); - } - - @Override - protected synchronized int readChunk(long pos, byte[] buf, int offset, - int len, byte[] checksumBuf) - throws IOException { - try (TraceScope ignored = tracer.newScope( - "RemoteBlockReader#readChunk(" + blockId + ")")) { - return readChunkImpl(pos, buf, offset, len, checksumBuf); - } - } - - private synchronized int readChunkImpl(long pos, byte[] buf, int offset, - int len, byte[] checksumBuf) - throws IOException { - // Read one chunk. - if (eos) { - // Already hit EOF - return -1; - } - - // Read one DATA_CHUNK. - long chunkOffset = lastChunkOffset; - if ( lastChunkLen > 0 ) { - chunkOffset += lastChunkLen; - } - - // pos is relative to the start of the first chunk of the read. - // chunkOffset is relative to the start of the block. - // This makes sure that the read passed from FSInputChecker is the - // for the same chunk we expect to be reading from the DN. - if ( (pos + firstChunkOffset) != chunkOffset ) { - throw new IOException("Mismatch in pos : " + pos + " + " + - firstChunkOffset + " != " + chunkOffset); - } - - // Read next packet if the previous packet has been read completely. - if (dataLeft <= 0) { - //Read packet headers. - PacketHeader header = new PacketHeader(); - header.readFields(in); - - LOG.debug("DFSClient readChunk got header {}", header); - - // Sanity check the lengths - if (!header.sanityCheck(lastSeqNo)) { - throw new IOException("BlockReader: error in packet header " + - header); - } - - lastSeqNo = header.getSeqno(); - dataLeft = header.getDataLen(); - adjustChecksumBytes(header.getDataLen()); - if (header.getDataLen() > 0) { - IOUtils.readFully(in, checksumBytes.array(), 0, - checksumBytes.limit()); - } - } - - // Sanity checks - assert len >= bytesPerChecksum; - assert checksum != null; - assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0); - - - int checksumsToRead, bytesToRead; - - if (checksumSize > 0) { - - // How many chunks left in our packet - this is a ceiling - // since we may have a partial chunk at the end of the file - int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; - - // How many chunks we can fit in databuffer - // - note this is a floor since we always read full chunks - int chunksCanFit = Math.min(len / bytesPerChecksum, - checksumBuf.length / checksumSize); - - // How many chunks should we read - checksumsToRead = Math.min(chunksLeft, chunksCanFit); - // How many bytes should we actually read - bytesToRead = Math.min( - checksumsToRead * bytesPerChecksum, // full chunks - dataLeft); // in case we have a partial - } else { - // no checksum - bytesToRead = Math.min(dataLeft, len); - checksumsToRead = 0; - } - - if ( bytesToRead > 0 ) { - // Assert we have enough space - assert bytesToRead <= len; - assert checksumBytes.remaining() >= checksumSize * checksumsToRead; - assert checksumBuf.length >= checksumSize * checksumsToRead; - IOUtils.readFully(in, buf, offset, bytesToRead); - checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead); - } - - dataLeft -= bytesToRead; - assert dataLeft >= 0; - - lastChunkOffset = chunkOffset; - lastChunkLen = bytesToRead; - - // If there's no data left in the current packet after satisfying - // this read, and we have satisfied the client read, we expect - // an empty packet header from the DN to signify this. - // Note that pos + bytesToRead may in fact be greater since the - // DN finishes off the entire last chunk. - if (dataLeft == 0 && - pos + bytesToRead >= bytesNeededToFinish) { - - // Read header - PacketHeader hdr = new PacketHeader(); - hdr.readFields(in); - - if (!hdr.isLastPacketInBlock() || - hdr.getDataLen() != 0) { - throw new IOException("Expected empty end-of-read packet! Header: " + - hdr); - } - - eos = true; - } - - if ( bytesToRead == 0 ) { - return -1; - } - - return bytesToRead; - } - - private RemoteBlockReader(String file, String bpid, long blockId, - DataInputStream in, DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache, Tracer tracer, - int networkDistance) { - // Path is used only for printing block and file information in debug - super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + - ":" + bpid + ":of:"+ file)/*too non path-like?*/, - 1, verifyChecksum, - checksum.getChecksumSize() > 0? checksum : null, - checksum.getBytesPerChecksum(), - checksum.getChecksumSize()); - - this.peer = peer; - this.datanodeID = datanodeID; - this.in = in; - this.checksum = checksum; - this.startOffset = Math.max( startOffset, 0 ); - this.blockId = blockId; - - // The total number of bytes that we need to transfer from the DN is - // the amount that the user wants (bytesToRead), plus the padding at - // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read starts/ends mid-chunk. - this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); - - this.firstChunkOffset = firstChunkOffset; - lastChunkOffset = firstChunkOffset; - lastChunkLen = -1; - - bytesPerChecksum = this.checksum.getBytesPerChecksum(); - checksumSize = this.checksum.getChecksumSize(); - this.peerCache = peerCache; - this.tracer = tracer; - this.networkDistance = networkDistance; - } - - /** - * Create a new BlockReader specifically to satisfy a read. - * This method also sends the OP_READ_BLOCK request. - * - * @param file File location - * @param block The block object - * @param blockToken The block token for security - * @param startOffset The read offset, relative to block head - * @param len The number of bytes to read - * @param bufferSize The IO buffer size (not the client buffer size) - * @param verifyChecksum Whether to verify checksum - * @param clientName Client name - * @return New BlockReader instance, or null on error. - */ - public static RemoteBlockReader newBlockReader(String file, - ExtendedBlock block, - Token<BlockTokenIdentifier> blockToken, - long startOffset, long len, - int bufferSize, boolean verifyChecksum, - String clientName, Peer peer, - DatanodeID datanodeID, - PeerCache peerCache, - CachingStrategy cachingStrategy, - Tracer tracer, int networkDistance) - throws IOException { - // in and out will be closed when sock is closed (by the caller) - final DataOutputStream out = - new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, - verifyChecksum, cachingStrategy); - - // - // Get bytes in block, set streams - // - - DataInputStream in = new DataInputStream( - new BufferedInputStream(peer.getInputStream(), bufferSize)); - - BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); - RemoteBlockReader2.checkSuccess(status, peer, block, file); - ReadOpChecksumInfoProto checksumInfo = - status.getReadOpChecksumInfo(); - DataChecksum checksum = DataTransferProtoUtil.fromProto( - checksumInfo.getChecksum()); - //Warning when we get CHECKSUM_NULL? - - // Read the first chunk offset. - long firstChunkOffset = checksumInfo.getChunkOffset(); - - if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || - firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { - throw new IOException("BlockReader: error in first chunk offset (" + - firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); - } - - return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), - in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, - peer, datanodeID, peerCache, tracer, networkDistance); - } - - @Override - public synchronized void close() throws IOException { - startOffset = -1; - checksum = null; - if (peerCache != null & sentStatusCode) { - peerCache.put(datanodeID, peer); - } else { - peer.close(); - } - - // in will be closed when its Socket is closed. - } - - @Override - public void readFully(byte[] buf, int readOffset, int amtToRead) - throws IOException { - IOUtils.readFully(this, buf, readOffset, amtToRead); - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return readFully(this, buf, offset, len); - } - - /** - * When the reader reaches end of the read, it sends a status response - * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN - * closing our connection (which we will re-open), but won't affect - * data correctness. - */ - void sendReadResult(Peer peer, Status statusCode) { - assert !sentStatusCode : "already sent status code to " + peer; - try { - RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode); - sentStatusCode = true; - } catch (IOException e) { - // It's ok not to be able to send this. But something is probably wrong. - LOG.info("Could not send read status (" + statusCode + ") to datanode " + - peer.getRemoteAddressString() + ": " + e.getMessage()); - } - } - - @Override - public int read(ByteBuffer buf) throws IOException { - throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); - } - - @Override - public int available() { - // An optimistic estimate of how much data is available - // to us without doing network I/O. - return RemoteBlockReader2.TCP_WINDOW_SIZE; - } - - @Override - public boolean isShortCircuit() { - return false; - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - return null; - } - - @Override - public DataChecksum getDataChecksum() { - return checksum; - } - - @Override - public int getNetworkDistance() { - return networkDistance; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java deleted file mode 100644 index 9437353..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ /dev/null @@ -1,472 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.util.EnumSet; -import java.util.UUID; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.core.TraceScope; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.htrace.core.Tracer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a wrapper around connection to datanode - * and understands checksum, offset etc. - * - * Terminology: - * <dl> - * <dt>block</dt> - * <dd>The hdfs block, typically large (~64MB). - * </dd> - * <dt>chunk</dt> - * <dd>A block is divided into chunks, each comes with a checksum. - * We want transfers to be chunk-aligned, to be able to - * verify checksums. - * </dd> - * <dt>packet</dt> - * <dd>A grouping of chunks used for transport. It contains a - * header, followed by checksum data, followed by real data. - * </dd> - * </dl> - * Please see DataNode for the RPC specification. - * - * This is a new implementation introduced in Hadoop 0.23 which - * is more efficient and simpler than the older BlockReader - * implementation. It should be renamed to RemoteBlockReader - * once we are confident in it. - */ [email protected] -public class RemoteBlockReader2 implements BlockReader { - - static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class); - static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB; - - final private Peer peer; - final private DatanodeID datanodeID; - final private PeerCache peerCache; - final private long blockId; - private final ReadableByteChannel in; - - private DataChecksum checksum; - private final PacketReceiver packetReceiver = new PacketReceiver(true); - - private ByteBuffer curDataSlice = null; - - /** offset in block of the last chunk received */ - private long lastSeqNo = -1; - - /** offset in block where reader wants to actually read */ - private long startOffset; - private final String filename; - - private final int bytesPerChecksum; - private final int checksumSize; - - /** - * The total number of bytes we need to transfer from the DN. - * This is the amount that the user has requested plus some padding - * at the beginning so that the read can begin on a chunk boundary. - */ - private long bytesNeededToFinish; - - private final boolean verifyChecksum; - - private boolean sentStatusCode = false; - - private final Tracer tracer; - - private final int networkDistance; - - @VisibleForTesting - public Peer getPeer() { - return peer; - } - - @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null); - LOG.trace("Starting read #{} file {} from datanode {}", - randomId, filename, datanodeID.getHostName()); - - if (curDataSlice == null || - curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - try (TraceScope ignored = tracer.newScope( - "RemoteBlockReader2#readNextPacket(" + blockId + ")")) { - readNextPacket(); - } - } - - LOG.trace("Finishing read #{}", randomId); - - if (curDataSlice.remaining() == 0) { - // we're at EOF now - return -1; - } - - int nRead = Math.min(curDataSlice.remaining(), len); - curDataSlice.get(buf, off, nRead); - - return nRead; - } - - - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - if (curDataSlice == null || - (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) { - try (TraceScope ignored = tracer.newScope( - "RemoteBlockReader2#readNextPacket(" + blockId + ")")) { - readNextPacket(); - } - } - if (curDataSlice.remaining() == 0) { - // we're at EOF now - return -1; - } - - int nRead = Math.min(curDataSlice.remaining(), buf.remaining()); - ByteBuffer writeSlice = curDataSlice.duplicate(); - writeSlice.limit(writeSlice.position() + nRead); - buf.put(writeSlice); - curDataSlice.position(writeSlice.position()); - - return nRead; - } - - private void readNextPacket() throws IOException { - //Read packet headers. - packetReceiver.receiveNextPacket(in); - - PacketHeader curHeader = packetReceiver.getHeader(); - curDataSlice = packetReceiver.getDataSlice(); - assert curDataSlice.capacity() == curHeader.getDataLen(); - - LOG.trace("DFSClient readNextPacket got header {}", curHeader); - - // Sanity check the lengths - if (!curHeader.sanityCheck(lastSeqNo)) { - throw new IOException("BlockReader: error in packet header " + - curHeader); - } - - if (curHeader.getDataLen() > 0) { - int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; - int checksumsLen = chunks * checksumSize; - - assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : - "checksum slice capacity=" + - packetReceiver.getChecksumSlice().capacity() + - " checksumsLen=" + checksumsLen; - - lastSeqNo = curHeader.getSeqno(); - if (verifyChecksum && curDataSlice.remaining() > 0) { - // N.B.: the checksum error offset reported here is actually - // relative to the start of the block, not the start of the file. - // This is slightly misleading, but preserves the behavior from - // the older BlockReader. - checksum.verifyChunkedSums(curDataSlice, - packetReceiver.getChecksumSlice(), - filename, curHeader.getOffsetInBlock()); - } - bytesNeededToFinish -= curHeader.getDataLen(); - } - - // First packet will include some data prior to the first byte - // the user requested. Skip it. - if (curHeader.getOffsetInBlock() < startOffset) { - int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); - curDataSlice.position(newPos); - } - - // If we've now satisfied the whole client read, read one last packet - // header, which should be empty - if (bytesNeededToFinish <= 0) { - readTrailingEmptyPacket(); - if (verifyChecksum) { - sendReadResult(Status.CHECKSUM_OK); - } else { - sendReadResult(Status.SUCCESS); - } - } - } - - @Override - public synchronized long skip(long n) throws IOException { - /* How can we make sure we don't throw a ChecksumException, at least - * in majority of the cases?. This one throws. */ - long skipped = 0; - while (skipped < n) { - long needToSkip = n - skipped; - if (curDataSlice == null || - curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - readNextPacket(); - } - if (curDataSlice.remaining() == 0) { - // we're at EOF now - break; - } - - int skip = (int)Math.min(curDataSlice.remaining(), needToSkip); - curDataSlice.position(curDataSlice.position() + skip); - skipped += skip; - } - return skipped; - } - - private void readTrailingEmptyPacket() throws IOException { - LOG.trace("Reading empty packet at end of read"); - - packetReceiver.receiveNextPacket(in); - - PacketHeader trailer = packetReceiver.getHeader(); - if (!trailer.isLastPacketInBlock() || - trailer.getDataLen() != 0) { - throw new IOException("Expected empty end-of-read packet! Header: " + - trailer); - } - } - - protected RemoteBlockReader2(String file, long blockId, - DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache, Tracer tracer, - int networkDistance) { - // Path is used only for printing block and file information in debug - this.peer = peer; - this.datanodeID = datanodeID; - this.in = peer.getInputStreamChannel(); - this.checksum = checksum; - this.verifyChecksum = verifyChecksum; - this.startOffset = Math.max( startOffset, 0 ); - this.filename = file; - this.peerCache = peerCache; - this.blockId = blockId; - - // The total number of bytes that we need to transfer from the DN is - // the amount that the user wants (bytesToRead), plus the padding at - // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read starts/ends mid-chunk. - this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); - bytesPerChecksum = this.checksum.getBytesPerChecksum(); - checksumSize = this.checksum.getChecksumSize(); - this.tracer = tracer; - this.networkDistance = networkDistance; - } - - - @Override - public synchronized void close() throws IOException { - packetReceiver.close(); - startOffset = -1; - checksum = null; - if (peerCache != null && sentStatusCode) { - peerCache.put(datanodeID, peer); - } else { - peer.close(); - } - - // in will be closed when its Socket is closed. - } - - /** - * When the reader reaches end of the read, it sends a status response - * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN - * closing our connection (which we will re-open), but won't affect - * data correctness. - */ - void sendReadResult(Status statusCode) { - assert !sentStatusCode : "already sent status code to " + peer; - try { - writeReadResult(peer.getOutputStream(), statusCode); - sentStatusCode = true; - } catch (IOException e) { - // It's ok not to be able to send this. But something is probably wrong. - LOG.info("Could not send read status (" + statusCode + ") to datanode " + - peer.getRemoteAddressString() + ": " + e.getMessage()); - } - } - - /** - * Serialize the actual read result on the wire. - */ - static void writeReadResult(OutputStream out, Status statusCode) - throws IOException { - - ClientReadStatusProto.newBuilder() - .setStatus(statusCode) - .build() - .writeDelimitedTo(out); - - out.flush(); - } - - /** - * File name to print when accessing a block directly (from servlets) - * @param s Address of the block location - * @param poolId Block pool ID of the block - * @param blockId Block ID of the block - * @return string that has a file name for debug purposes - */ - public static String getFileName(final InetSocketAddress s, - final String poolId, final long blockId) { - return s.toString() + ":" + poolId + ":" + blockId; - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return BlockReaderUtil.readAll(this, buf, offset, len); - } - - @Override - public void readFully(byte[] buf, int off, int len) throws IOException { - BlockReaderUtil.readFully(this, buf, off, len); - } - - /** - * Create a new BlockReader specifically to satisfy a read. - * This method also sends the OP_READ_BLOCK request. - * - * @param file File location - * @param block The block object - * @param blockToken The block token for security - * @param startOffset The read offset, relative to block head - * @param len The number of bytes to read - * @param verifyChecksum Whether to verify checksum - * @param clientName Client name - * @param peer The Peer to use - * @param datanodeID The DatanodeID this peer is connected to - * @return New BlockReader instance, or null on error. - */ - public static BlockReader newBlockReader(String file, - ExtendedBlock block, - Token<BlockTokenIdentifier> blockToken, - long startOffset, long len, - boolean verifyChecksum, - String clientName, - Peer peer, DatanodeID datanodeID, - PeerCache peerCache, - CachingStrategy cachingStrategy, - Tracer tracer, - int networkDistance) throws IOException { - // in and out will be closed when sock is closed (by the caller) - final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - peer.getOutputStream())); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, - verifyChecksum, cachingStrategy); - - // - // Get bytes in block - // - DataInputStream in = new DataInputStream(peer.getInputStream()); - - BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); - checkSuccess(status, peer, block, file); - ReadOpChecksumInfoProto checksumInfo = - status.getReadOpChecksumInfo(); - DataChecksum checksum = DataTransferProtoUtil.fromProto( - checksumInfo.getChecksum()); - //Warning when we get CHECKSUM_NULL? - - // Read the first chunk offset. - long firstChunkOffset = checksumInfo.getChunkOffset(); - - if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || - firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { - throw new IOException("BlockReader: error in first chunk offset (" + - firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); - } - - return new RemoteBlockReader2(file, block.getBlockId(), checksum, - verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, - peerCache, tracer, networkDistance); - } - - static void checkSuccess( - BlockOpResponseProto status, Peer peer, - ExtendedBlock block, String file) - throws IOException { - String logInfo = "for OP_READ_BLOCK" - + ", self=" + peer.getLocalAddressString() - + ", remote=" + peer.getRemoteAddressString() - + ", for file " + file - + ", for pool " + block.getBlockPoolId() - + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); - DataTransferProtoUtil.checkBlockOpStatus(status, logInfo); - } - - @Override - public int available() { - // An optimistic estimate of how much data is available - // to us without doing network I/O. - return TCP_WINDOW_SIZE; - } - - @Override - public boolean isShortCircuit() { - return false; - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - return null; - } - - @Override - public DataChecksum getDataChecksum() { - return checksum; - } - - @Override - public int getNetworkDistance() { - return networkDistance; - } -}
