http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java new file mode 100644 index 0000000..f4b62d9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -0,0 +1,878 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.InetSocketAddress; +import java.util.List; + +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.ClientContext; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.RemotePeerFactory; +import org.apache.hadoop.hdfs.ReplicaAccessor; +import org.apache.hadoop.hdfs.ReplicaAccessorBuilder; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.net.DomainPeer; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.apache.hadoop.hdfs.util.IOUtilsClient; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.PerformanceAdvisory; +import org.apache.hadoop.util.Time; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.htrace.core.Tracer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utility class to create BlockReader implementations. + */ [email protected] +public class BlockReaderFactory implements ShortCircuitReplicaCreator { + static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class); + + public static class FailureInjector { + public void injectRequestFileDescriptorsFailure() throws IOException { + // do nothing + } + public boolean getSupportsReceiptVerification() { + return true; + } + } + + @VisibleForTesting + static ShortCircuitReplicaCreator + createShortCircuitReplicaInfoCallback = null; + + private final DfsClientConf conf; + + /** + * Injects failures into specific operations during unit tests. + */ + private static FailureInjector failureInjector = new FailureInjector(); + + /** + * The file name, for logging and debugging purposes. + */ + private String fileName; + + /** + * The block ID and block pool ID to use. + */ + private ExtendedBlock block; + + /** + * The block token to use for security purposes. + */ + private Token<BlockTokenIdentifier> token; + + /** + * The offset within the block to start reading at. + */ + private long startOffset; + + /** + * If false, we won't try to verify the block checksum. + */ + private boolean verifyChecksum; + + /** + * The name of this client. + */ + private String clientName; + + /** + * The DataNode we're talking to. + */ + private DatanodeInfo datanode; + + /** + * StorageType of replica on DataNode. + */ + private StorageType storageType; + + /** + * If false, we won't try short-circuit local reads. + */ + private boolean allowShortCircuitLocalReads; + + /** + * The ClientContext to use for things like the PeerCache. + */ + private ClientContext clientContext; + + /** + * Number of bytes to read. Must be set to a non-negative value. + */ + private long length = -1; + + /** + * Caching strategy to use when reading the block. + */ + private CachingStrategy cachingStrategy; + + /** + * Socket address to use to connect to peer. + */ + private InetSocketAddress inetSocketAddress; + + /** + * Remote peer factory to use to create a peer, if needed. + */ + private RemotePeerFactory remotePeerFactory; + + /** + * UserGroupInformation to use for legacy block reader local objects, + * if needed. + */ + private UserGroupInformation userGroupInformation; + + /** + * Configuration to use for legacy block reader local objects, if needed. + */ + private Configuration configuration; + + /** + * The HTrace tracer to use. + */ + private Tracer tracer; + + /** + * Information about the domain socket path we should use to connect to the + * local peer-- or null if we haven't examined the local domain socket. + */ + private DomainSocketFactory.PathInfo pathInfo; + + /** + * The remaining number of times that we'll try to pull a socket out of the + * cache. + */ + private int remainingCacheTries; + + public BlockReaderFactory(DfsClientConf conf) { + this.conf = conf; + this.remainingCacheTries = conf.getNumCachedConnRetry(); + } + + public BlockReaderFactory setFileName(String fileName) { + this.fileName = fileName; + return this; + } + + public BlockReaderFactory setBlock(ExtendedBlock block) { + this.block = block; + return this; + } + + public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) { + this.token = token; + return this; + } + + public BlockReaderFactory setStartOffset(long startOffset) { + this.startOffset = startOffset; + return this; + } + + public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) { + this.verifyChecksum = verifyChecksum; + return this; + } + + public BlockReaderFactory setClientName(String clientName) { + this.clientName = clientName; + return this; + } + + public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) { + this.datanode = datanode; + return this; + } + + public BlockReaderFactory setStorageType(StorageType storageType) { + this.storageType = storageType; + return this; + } + + public BlockReaderFactory setAllowShortCircuitLocalReads( + boolean allowShortCircuitLocalReads) { + this.allowShortCircuitLocalReads = allowShortCircuitLocalReads; + return this; + } + + public BlockReaderFactory setClientCacheContext( + ClientContext clientContext) { + this.clientContext = clientContext; + return this; + } + + public BlockReaderFactory setLength(long length) { + this.length = length; + return this; + } + + public BlockReaderFactory setCachingStrategy( + CachingStrategy cachingStrategy) { + this.cachingStrategy = cachingStrategy; + return this; + } + + public BlockReaderFactory setInetSocketAddress ( + InetSocketAddress inetSocketAddress) { + this.inetSocketAddress = inetSocketAddress; + return this; + } + + public BlockReaderFactory setUserGroupInformation( + UserGroupInformation userGroupInformation) { + this.userGroupInformation = userGroupInformation; + return this; + } + + public BlockReaderFactory setRemotePeerFactory( + RemotePeerFactory remotePeerFactory) { + this.remotePeerFactory = remotePeerFactory; + return this; + } + + public BlockReaderFactory setConfiguration( + Configuration configuration) { + this.configuration = configuration; + return this; + } + + public BlockReaderFactory setTracer(Tracer tracer) { + this.tracer = tracer; + return this; + } + + @VisibleForTesting + public static void setFailureInjectorForTesting(FailureInjector injector) { + failureInjector = injector; + } + + /** + * Build a BlockReader with the given options. + * + * This function will do the best it can to create a block reader that meets + * all of our requirements. We prefer short-circuit block readers + * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the + * former avoid the overhead of socket communication. If short-circuit is + * unavailable, our next fallback is data transfer over UNIX domain sockets, + * if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't + * work, we will try to create a remote block reader that operates over TCP + * sockets. + * + * There are a few caches that are important here. + * + * The ShortCircuitCache stores file descriptor objects which have been passed + * from the DataNode. + * + * The DomainSocketFactory stores information about UNIX domain socket paths + * that we not been able to use in the past, so that we don't waste time + * retrying them over and over. (Like all the caches, it does have a timeout, + * though.) + * + * The PeerCache stores peers that we have used in the past. If we can reuse + * one of these peers, we avoid the overhead of re-opening a socket. However, + * if the socket has been timed out on the remote end, our attempt to reuse + * the socket may end with an IOException. For that reason, we limit our + * attempts at socket reuse to dfs.client.cached.conn.retry times. After + * that, we create new sockets. This avoids the problem where a thread tries + * to talk to a peer that it hasn't talked to in a while, and has to clean out + * every entry in a socket cache full of stale entries. + * + * @return The new BlockReader. We will not return null. + * + * @throws InvalidToken + * If the block token was invalid. + * InvalidEncryptionKeyException + * If the encryption key was invalid. + * Other IOException + * If there was another problem. + */ + public BlockReader build() throws IOException { + Preconditions.checkNotNull(configuration); + Preconditions + .checkState(length >= 0, "Length must be set to a non-negative value"); + BlockReader reader = tryToCreateExternalBlockReader(); + if (reader != null) { + return reader; + } + final ShortCircuitConf scConf = conf.getShortCircuitConf(); + if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) { + if (clientContext.getUseLegacyBlockReaderLocal()) { + reader = getLegacyBlockReaderLocal(); + if (reader != null) { + LOG.trace("{}: returning new legacy block reader local.", this); + return reader; + } + } else { + reader = getBlockReaderLocal(); + if (reader != null) { + LOG.trace("{}: returning new block reader local.", this); + return reader; + } + } + } + if (scConf.isDomainSocketDataTraffic()) { + reader = getRemoteBlockReaderFromDomain(); + if (reader != null) { + LOG.trace("{}: returning new remote block reader using UNIX domain " + + "socket on {}", this, pathInfo.getPath()); + return reader; + } + } + Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting, + "TCP reads were disabled for testing, but we failed to " + + "do a non-TCP read."); + return getRemoteBlockReaderFromTcp(); + } + + private BlockReader tryToCreateExternalBlockReader() { + List<Class<? extends ReplicaAccessorBuilder>> clses = + conf.getReplicaAccessorBuilderClasses(); + for (Class<? extends ReplicaAccessorBuilder> cls : clses) { + try { + ByteArrayDataOutput bado = ByteStreams.newDataOutput(); + token.write(bado); + byte tokenBytes[] = bado.toByteArray(); + + Constructor<? extends ReplicaAccessorBuilder> ctor = + cls.getConstructor(); + ReplicaAccessorBuilder builder = ctor.newInstance(); + long visibleLength = startOffset + length; + ReplicaAccessor accessor = builder. + setAllowShortCircuitReads(allowShortCircuitLocalReads). + setBlock(block.getBlockId(), block.getBlockPoolId()). + setGenerationStamp(block.getGenerationStamp()). + setBlockAccessToken(tokenBytes). + setClientName(clientName). + setConfiguration(configuration). + setFileName(fileName). + setVerifyChecksum(verifyChecksum). + setVisibleLength(visibleLength). + build(); + if (accessor == null) { + LOG.trace("{}: No ReplicaAccessor created by {}", + this, cls.getName()); + } else { + return new ExternalBlockReader(accessor, visibleLength, startOffset); + } + } catch (Throwable t) { + LOG.warn("Failed to construct new object of type " + + cls.getName(), t); + } + } + return null; + } + + + /** + * Get {@link BlockReaderLocalLegacy} for short circuited local reads. + * This block reader implements the path-based style of local reads + * first introduced in HDFS-2246. + */ + private BlockReader getLegacyBlockReaderLocal() throws IOException { + LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this); + if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) { + LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address" + + "{} is not local", this, inetSocketAddress); + return null; + } + if (clientContext.getDisableLegacyBlockReaderLocal()) { + PerformanceAdvisory.LOG.debug("{}: can't construct " + + "BlockReaderLocalLegacy because " + + "disableLegacyBlockReaderLocal is set.", this); + return null; + } + IOException ioe; + try { + return BlockReaderLocalLegacy.newBlockReader(conf, + userGroupInformation, configuration, fileName, block, token, + datanode, startOffset, length, storageType, tracer); + } catch (RemoteException remoteException) { + ioe = remoteException.unwrapRemoteException( + InvalidToken.class, AccessControlException.class); + } catch (IOException e) { + ioe = e; + } + if ((!(ioe instanceof AccessControlException)) && + isSecurityException(ioe)) { + // Handle security exceptions. + // We do not handle AccessControlException here, since + // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate + // that the user is not in dfs.block.local-path-access.user, a condition + // which requires us to disable legacy SCR. + throw ioe; + } + LOG.warn(this + ": error creating legacy BlockReaderLocal. " + + "Disabling legacy local reads.", ioe); + clientContext.setDisableLegacyBlockReaderLocal(); + return null; + } + + private BlockReader getBlockReaderLocal() throws InvalidToken { + LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit " + + " reads.", this); + if (pathInfo == null) { + pathInfo = clientContext.getDomainSocketFactory() + .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); + } + if (!pathInfo.getPathState().getUsableForShortCircuit()) { + PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " + + "giving up on BlockReaderLocal.", this, pathInfo); + return null; + } + ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), + block.getBlockPoolId()); + ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); + InvalidToken exc = info.getInvalidTokenException(); + if (exc != null) { + LOG.trace("{}: got InvalidToken exception while trying to construct " + + "BlockReaderLocal via {}", this, pathInfo.getPath()); + throw exc; + } + if (info.getReplica() == null) { + PerformanceAdvisory.LOG.debug("{}: failed to get " + + "ShortCircuitReplica. Cannot construct " + + "BlockReaderLocal via {}", this, pathInfo.getPath()); + return null; + } + return new BlockReaderLocal.Builder(conf.getShortCircuitConf()). + setFilename(fileName). + setBlock(block). + setStartOffset(startOffset). + setShortCircuitReplica(info.getReplica()). + setVerifyChecksum(verifyChecksum). + setCachingStrategy(cachingStrategy). + setStorageType(storageType). + setTracer(tracer). + build(); + } + + /** + * Fetch a pair of short-circuit block descriptors from a local DataNode. + * + * @return Null if we could not communicate with the datanode, + * a new ShortCircuitReplicaInfo object otherwise. + * ShortCircuitReplicaInfo objects may contain either an + * InvalidToken exception, or a ShortCircuitReplica object ready to + * use. + */ + @Override + public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { + if (createShortCircuitReplicaInfoCallback != null) { + ShortCircuitReplicaInfo info = + createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo(); + if (info != null) return info; + } + LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this); + BlockReaderPeer curPeer; + while (true) { + curPeer = nextDomainPeer(); + if (curPeer == null) break; + if (curPeer.fromCache) remainingCacheTries--; + DomainPeer peer = (DomainPeer)curPeer.peer; + Slot slot = null; + ShortCircuitCache cache = clientContext.getShortCircuitCache(); + try { + MutableBoolean usedPeer = new MutableBoolean(false); + slot = cache.allocShmSlot(datanode, peer, usedPeer, + new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()), + clientName); + if (usedPeer.booleanValue()) { + LOG.trace("{}: allocShmSlot used up our previous socket {}. " + + "Allocating a new one...", this, peer.getDomainSocket()); + curPeer = nextDomainPeer(); + if (curPeer == null) break; + peer = (DomainPeer)curPeer.peer; + } + ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot); + clientContext.getPeerCache().put(datanode, peer); + return info; + } catch (IOException e) { + if (slot != null) { + cache.freeSlot(slot); + } + if (curPeer.fromCache) { + // Handle an I/O error we got when using a cached socket. + // These are considered less serious, because the socket may be stale. + LOG.debug("{}: closing stale domain peer {}", this, peer, e); + IOUtilsClient.cleanup(LOG, peer); + } else { + // Handle an I/O error we got when using a newly created socket. + // We temporarily disable the domain socket path for a few minutes in + // this case, to prevent wasting more time on it. + LOG.warn(this + ": I/O error requesting file descriptors. " + + "Disabling domain socket " + peer.getDomainSocket(), e); + IOUtilsClient.cleanup(LOG, peer); + clientContext.getDomainSocketFactory() + .disableDomainSocketPath(pathInfo.getPath()); + return null; + } + } + } + return null; + } + + /** + * Request file descriptors from a DomainPeer. + * + * @param peer The peer to use for communication. + * @param slot If non-null, the shared memory slot to associate with the + * new ShortCircuitReplica. + * + * @return A ShortCircuitReplica object if we could communicate with the + * datanode; null, otherwise. + * @throws IOException If we encountered an I/O exception while communicating + * with the datanode. + */ + private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, + Slot slot) throws IOException { + ShortCircuitCache cache = clientContext.getShortCircuitCache(); + final DataOutputStream out = + new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); + SlotId slotId = slot == null ? null : slot.getSlotId(); + new Sender(out).requestShortCircuitFds(block, token, slotId, 1, + failureInjector.getSupportsReceiptVerification()); + DataInputStream in = new DataInputStream(peer.getInputStream()); + BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + DomainSocket sock = peer.getDomainSocket(); + failureInjector.injectRequestFileDescriptorsFailure(); + switch (resp.getStatus()) { + case SUCCESS: + byte buf[] = new byte[1]; + FileInputStream[] fis = new FileInputStream[2]; + sock.recvFileInputStreams(fis, buf, 0, buf.length); + ShortCircuitReplica replica = null; + try { + ExtendedBlockId key = + new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); + if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { + LOG.trace("Sending receipt verification byte for slot {}", slot); + sock.getOutputStream().write(0); + } + replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, + Time.monotonicNow(), slot); + return new ShortCircuitReplicaInfo(replica); + } catch (IOException e) { + // This indicates an error reading from disk, or a format error. Since + // it's not a socket communication problem, we return null rather than + // throwing an exception. + LOG.warn(this + ": error creating ShortCircuitReplica.", e); + return null; + } finally { + if (replica == null) { + IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]); + } + } + case ERROR_UNSUPPORTED: + if (!resp.hasShortCircuitAccessVersion()) { + LOG.warn("short-circuit read access is disabled for " + + "DataNode " + datanode + ". reason: " + resp.getMessage()); + clientContext.getDomainSocketFactory() + .disableShortCircuitForPath(pathInfo.getPath()); + } else { + LOG.warn("short-circuit read access for the file " + + fileName + " is disabled for DataNode " + datanode + + ". reason: " + resp.getMessage()); + } + return null; + case ERROR_ACCESS_TOKEN: + String msg = "access control error while " + + "attempting to set up short-circuit access to " + + fileName + resp.getMessage(); + LOG.debug("{}:{}", this, msg); + return new ShortCircuitReplicaInfo(new InvalidToken(msg)); + default: + LOG.warn(this + ": unknown response code " + resp.getStatus() + + " while attempting to set up short-circuit access. " + + resp.getMessage()); + clientContext.getDomainSocketFactory() + .disableShortCircuitForPath(pathInfo.getPath()); + return null; + } + } + + /** + * Get a BlockReaderRemote that communicates over a UNIX domain socket. + * + * @return The new BlockReader, or null if we failed to create the block + * reader. + * + * @throws InvalidToken If the block token was invalid. + * Potentially other security-related execptions. + */ + private BlockReader getRemoteBlockReaderFromDomain() throws IOException { + if (pathInfo == null) { + pathInfo = clientContext.getDomainSocketFactory() + .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); + } + if (!pathInfo.getPathState().getUsableForDataTransfer()) { + PerformanceAdvisory.LOG.debug("{}: not trying to create a " + + "remote block reader because the UNIX domain socket at {}" + + " is not usable.", this, pathInfo); + return null; + } + LOG.trace("{}: trying to create a remote block reader from the UNIX domain " + + "socket at {}", this, pathInfo.getPath()); + + while (true) { + BlockReaderPeer curPeer = nextDomainPeer(); + if (curPeer == null) break; + if (curPeer.fromCache) remainingCacheTries--; + DomainPeer peer = (DomainPeer)curPeer.peer; + BlockReader blockReader = null; + try { + blockReader = getRemoteBlockReader(peer); + return blockReader; + } catch (IOException ioe) { + IOUtilsClient.cleanup(LOG, peer); + if (isSecurityException(ioe)) { + LOG.trace("{}: got security exception while constructing a remote " + + " block reader from the unix domain socket at {}", + this, pathInfo.getPath(), ioe); + throw ioe; + } + if (curPeer.fromCache) { + // Handle an I/O error we got when using a cached peer. These are + // considered less serious because the underlying socket may be stale. + LOG.debug("Closed potentially stale domain peer {}", peer, ioe); + } else { + // Handle an I/O error we got when using a newly created domain peer. + // We temporarily disable the domain socket path for a few minutes in + // this case, to prevent wasting more time on it. + LOG.warn("I/O error constructing remote block reader. Disabling " + + "domain socket " + peer.getDomainSocket(), ioe); + clientContext.getDomainSocketFactory() + .disableDomainSocketPath(pathInfo.getPath()); + return null; + } + } finally { + if (blockReader == null) { + IOUtilsClient.cleanup(LOG, peer); + } + } + } + return null; + } + + /** + * Get a BlockReaderRemote that communicates over a TCP socket. + * + * @return The new BlockReader. We will not return null, but instead throw + * an exception if this fails. + * + * @throws InvalidToken + * If the block token was invalid. + * InvalidEncryptionKeyException + * If the encryption key was invalid. + * Other IOException + * If there was another problem. + */ + private BlockReader getRemoteBlockReaderFromTcp() throws IOException { + LOG.trace("{}: trying to create a remote block reader from a TCP socket", + this); + BlockReader blockReader = null; + while (true) { + BlockReaderPeer curPeer = null; + Peer peer = null; + try { + curPeer = nextTcpPeer(); + if (curPeer.fromCache) remainingCacheTries--; + peer = curPeer.peer; + blockReader = getRemoteBlockReader(peer); + return blockReader; + } catch (IOException ioe) { + if (isSecurityException(ioe)) { + LOG.trace("{}: got security exception while constructing a remote " + + "block reader from {}", this, peer, ioe); + throw ioe; + } + if ((curPeer != null) && curPeer.fromCache) { + // Handle an I/O error we got when using a cached peer. These are + // considered less serious, because the underlying socket may be + // stale. + LOG.debug("Closed potentially stale remote peer {}", peer, ioe); + } else { + // Handle an I/O error we got when using a newly created peer. + LOG.warn("I/O error constructing remote block reader.", ioe); + throw ioe; + } + } finally { + if (blockReader == null) { + IOUtilsClient.cleanup(LOG, peer); + } + } + } + } + + public static class BlockReaderPeer { + final Peer peer; + final boolean fromCache; + + BlockReaderPeer(Peer peer, boolean fromCache) { + this.peer = peer; + this.fromCache = fromCache; + } + } + + /** + * Get the next DomainPeer-- either from the cache or by creating it. + * + * @return the next DomainPeer, or null if we could not construct one. + */ + private BlockReaderPeer nextDomainPeer() { + if (remainingCacheTries > 0) { + Peer peer = clientContext.getPeerCache().get(datanode, true); + if (peer != null) { + LOG.trace("nextDomainPeer: reusing existing peer {}", peer); + return new BlockReaderPeer(peer, true); + } + } + DomainSocket sock = clientContext.getDomainSocketFactory(). + createSocket(pathInfo, conf.getSocketTimeout()); + if (sock == null) return null; + return new BlockReaderPeer(new DomainPeer(sock), false); + } + + /** + * Get the next TCP-based peer-- either from the cache or by creating it. + * + * @return the next Peer, or null if we could not construct one. + * + * @throws IOException If there was an error while constructing the peer + * (such as an InvalidEncryptionKeyException) + */ + private BlockReaderPeer nextTcpPeer() throws IOException { + if (remainingCacheTries > 0) { + Peer peer = clientContext.getPeerCache().get(datanode, false); + if (peer != null) { + LOG.trace("nextTcpPeer: reusing existing peer {}", peer); + return new BlockReaderPeer(peer, true); + } + } + try { + Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token, + datanode); + LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer); + return new BlockReaderPeer(peer, false); + } catch (IOException e) { + LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to" + + "{}", datanode); + throw e; + } + } + + /** + * Determine if an exception is security-related. + * + * We need to handle these exceptions differently than other IOExceptions. + * They don't indicate a communication problem. Instead, they mean that there + * is some action the client needs to take, such as refetching block tokens, + * renewing encryption keys, etc. + * + * @param ioe The exception + * @return True only if the exception is security-related. + */ + private static boolean isSecurityException(IOException ioe) { + return (ioe instanceof InvalidToken) || + (ioe instanceof InvalidEncryptionKeyException) || + (ioe instanceof InvalidBlockTokenException) || + (ioe instanceof AccessControlException); + } + + @SuppressWarnings("deprecation") + private BlockReader getRemoteBlockReader(Peer peer) throws IOException { + int networkDistance = clientContext.getNetworkDistance(datanode); + if (conf.getShortCircuitConf().isUseLegacyBlockReader()) { + return BlockReaderRemote.newBlockReader(fileName, + block, token, startOffset, length, conf.getIoBufferSize(), + verifyChecksum, clientName, peer, datanode, + clientContext.getPeerCache(), cachingStrategy, tracer, + networkDistance); + } else { + return BlockReaderRemote2.newBlockReader( + fileName, block, token, startOffset, length, + verifyChecksum, clientName, peer, datanode, + clientContext.getPeerCache(), cachingStrategy, tracer, + networkDistance); + } + } + + @Override + public String toString() { + return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")"; + } + + /** + * File name to print when accessing a block directly (from servlets) + * @param s Address of the block location + * @param poolId Block pool ID of the block + * @param blockId Block ID of the block + * @return string that has a file name for debug purposes + */ + public static String getFileName(final InetSocketAddress s, + final String poolId, final long blockId) { + return s.toString() + ":" + poolId + ":" + blockId; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java new file mode 100644 index 0000000..1b38996 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java @@ -0,0 +1,725 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DirectBufferPool; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * BlockReaderLocal enables local short circuited reads. If the DFS client is on + * the same machine as the datanode, then the client can read files directly + * from the local file system rather than going through the datanode for better + * performance. <br> + * {@link BlockReaderLocal} works as follows: + * <ul> + * <li>The client performing short circuit reads must be configured at the + * datanode.</li> + * <li>The client gets the file descriptors for the metadata file and the data + * file for the block using + * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}. + * </li> + * <li>The client reads the file descriptors.</li> + * </ul> + */ [email protected] +class BlockReaderLocal implements BlockReader { + static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class); + + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + + public static class Builder { + private final int bufferSize; + private boolean verifyChecksum; + private int maxReadahead; + private String filename; + private ShortCircuitReplica replica; + private long dataPos; + private ExtendedBlock block; + private StorageType storageType; + private Tracer tracer; + + public Builder(ShortCircuitConf conf) { + this.maxReadahead = Integer.MAX_VALUE; + this.verifyChecksum = !conf.isSkipShortCircuitChecksums(); + this.bufferSize = conf.getShortCircuitBufferSize(); + } + + public Builder setVerifyChecksum(boolean verifyChecksum) { + this.verifyChecksum = verifyChecksum; + return this; + } + + public Builder setCachingStrategy(CachingStrategy cachingStrategy) { + long readahead = cachingStrategy.getReadahead() != null ? + cachingStrategy.getReadahead() : + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; + this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead); + return this; + } + + public Builder setFilename(String filename) { + this.filename = filename; + return this; + } + + public Builder setShortCircuitReplica(ShortCircuitReplica replica) { + this.replica = replica; + return this; + } + + public Builder setStartOffset(long startOffset) { + this.dataPos = Math.max(0, startOffset); + return this; + } + + public Builder setBlock(ExtendedBlock block) { + this.block = block; + return this; + } + + public Builder setStorageType(StorageType storageType) { + this.storageType = storageType; + return this; + } + + public Builder setTracer(Tracer tracer) { + this.tracer = tracer; + return this; + } + + public BlockReaderLocal build() { + Preconditions.checkNotNull(replica); + return new BlockReaderLocal(this); + } + } + + private boolean closed = false; + + /** + * Pair of streams for this block. + */ + private final ShortCircuitReplica replica; + + /** + * The data FileChannel. + */ + private final FileChannel dataIn; + + /** + * The next place we'll read from in the block data FileChannel. + * + * If data is buffered in dataBuf, this offset will be larger than the + * offset of the next byte which a read() operation will give us. + */ + private long dataPos; + + /** + * The Checksum FileChannel. + */ + private final FileChannel checksumIn; + + /** + * Checksum type and size. + */ + private final DataChecksum checksum; + + /** + * If false, we will always skip the checksum. + */ + private final boolean verifyChecksum; + + /** + * Name of the block, for logging purposes. + */ + private final String filename; + + /** + * Block ID and Block Pool ID. + */ + private final ExtendedBlock block; + + /** + * Cache of Checksum#bytesPerChecksum. + */ + private final int bytesPerChecksum; + + /** + * Cache of Checksum#checksumSize. + */ + private final int checksumSize; + + /** + * Maximum number of chunks to allocate. + * + * This is used to allocate dataBuf and checksumBuf, in the event that + * we need them. + */ + private final int maxAllocatedChunks; + + /** + * True if zero readahead was requested. + */ + private final boolean zeroReadaheadRequested; + + /** + * Maximum amount of readahead we'll do. This will always be at least the, + * size of a single chunk, even if {@link #zeroReadaheadRequested} is true. + * The reason is because we need to do a certain amount of buffering in order + * to do checksumming. + * + * This determines how many bytes we'll use out of dataBuf and checksumBuf. + * Why do we allocate buffers, and then (potentially) only use part of them? + * The rationale is that allocating a lot of buffers of different sizes would + * make it very difficult for the DirectBufferPool to re-use buffers. + */ + private final int maxReadaheadLength; + + /** + * Buffers data starting at the current dataPos and extending on + * for dataBuf.limit(). + * + * This may be null if we don't need it. + */ + private ByteBuffer dataBuf; + + /** + * Buffers checksums starting at the current checksumPos and extending on + * for checksumBuf.limit(). + * + * This may be null if we don't need it. + */ + private ByteBuffer checksumBuf; + + /** + * StorageType of replica on DataNode. + */ + private StorageType storageType; + + /** + * The Tracer to use. + */ + private final Tracer tracer; + + private BlockReaderLocal(Builder builder) { + this.replica = builder.replica; + this.dataIn = replica.getDataStream().getChannel(); + this.dataPos = builder.dataPos; + this.checksumIn = replica.getMetaStream().getChannel(); + BlockMetadataHeader header = builder.replica.getMetaHeader(); + this.checksum = header.getChecksum(); + this.verifyChecksum = builder.verifyChecksum && + (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); + this.filename = builder.filename; + this.block = builder.block; + this.bytesPerChecksum = checksum.getBytesPerChecksum(); + this.checksumSize = checksum.getChecksumSize(); + + this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 : + ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum); + // Calculate the effective maximum readahead. + // We can't do more readahead than there is space in the buffer. + int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 : + ((Math.min(builder.bufferSize, builder.maxReadahead) + + bytesPerChecksum - 1) / bytesPerChecksum); + if (maxReadaheadChunks == 0) { + this.zeroReadaheadRequested = true; + maxReadaheadChunks = 1; + } else { + this.zeroReadaheadRequested = false; + } + this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; + this.storageType = builder.storageType; + this.tracer = builder.tracer; + } + + private synchronized void createDataBufIfNeeded() { + if (dataBuf == null) { + dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum); + dataBuf.position(0); + dataBuf.limit(0); + } + } + + private synchronized void freeDataBufIfExists() { + if (dataBuf != null) { + // When disposing of a dataBuf, we have to move our stored file index + // backwards. + dataPos -= dataBuf.remaining(); + dataBuf.clear(); + bufferPool.returnBuffer(dataBuf); + dataBuf = null; + } + } + + private synchronized void createChecksumBufIfNeeded() { + if (checksumBuf == null) { + checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize); + checksumBuf.position(0); + checksumBuf.limit(0); + } + } + + private synchronized void freeChecksumBufIfExists() { + if (checksumBuf != null) { + checksumBuf.clear(); + bufferPool.returnBuffer(checksumBuf); + checksumBuf = null; + } + } + + private synchronized int drainDataBuf(ByteBuffer buf) { + if (dataBuf == null) return -1; + int oldLimit = dataBuf.limit(); + int nRead = Math.min(dataBuf.remaining(), buf.remaining()); + if (nRead == 0) { + return (dataBuf.remaining() == 0) ? -1 : 0; + } + try { + dataBuf.limit(dataBuf.position() + nRead); + buf.put(dataBuf); + } finally { + dataBuf.limit(oldLimit); + } + return nRead; + } + + /** + * Read from the block file into a buffer. + * + * This function overwrites checksumBuf. It will increment dataPos. + * + * @param buf The buffer to read into. May be dataBuf. + * The position and limit of this buffer should be set to + * multiples of the checksum size. + * @param canSkipChecksum True if we can skip checksumming. + * + * @return Total bytes read. 0 on EOF. + */ + private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum) + throws IOException { + try (TraceScope ignored = tracer.newScope( + "BlockReaderLocal#fillBuffer(" + block.getBlockId() + ")")) { + int total = 0; + long startDataPos = dataPos; + int startBufPos = buf.position(); + while (buf.hasRemaining()) { + int nRead = dataIn.read(buf, dataPos); + if (nRead < 0) { + break; + } + dataPos += nRead; + total += nRead; + } + if (canSkipChecksum) { + freeChecksumBufIfExists(); + return total; + } + if (total > 0) { + try { + buf.limit(buf.position()); + buf.position(startBufPos); + createChecksumBufIfNeeded(); + int checksumsNeeded = (total + bytesPerChecksum - 1) / + bytesPerChecksum; + checksumBuf.clear(); + checksumBuf.limit(checksumsNeeded * checksumSize); + long checksumPos = BlockMetadataHeader.getHeaderSize() + + ((startDataPos / bytesPerChecksum) * checksumSize); + while (checksumBuf.hasRemaining()) { + int nRead = checksumIn.read(checksumBuf, checksumPos); + if (nRead < 0) { + throw new IOException("Got unexpected checksum file EOF at " + + checksumPos + ", block file position " + startDataPos + + " for block " + block + " of file " + filename); + } + checksumPos += nRead; + } + checksumBuf.flip(); + + checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos); + } finally { + buf.position(buf.limit()); + } + } + return total; + } + } + + private boolean createNoChecksumContext() { + return !verifyChecksum || + // Checksums are not stored for replicas on transient storage. We do + // not anchor, because we do not intend for client activity to block + // eviction from transient storage on the DataNode side. + (storageType != null && storageType.isTransient()) || + replica.addNoChecksumAnchor(); + } + + private void releaseNoChecksumContext() { + if (verifyChecksum) { + if (storageType == null || !storageType.isTransient()) { + replica.removeNoChecksumAnchor(); + } + } + } + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + boolean canSkipChecksum = createNoChecksumContext(); + try { + String traceFormatStr = "read(buf.remaining={}, block={}, filename={}, " + + "canSkipChecksum={})"; + LOG.trace(traceFormatStr + ": starting", + buf.remaining(), block, filename, canSkipChecksum); + int nRead; + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(buf); + } else { + nRead = readWithBounceBuffer(buf, canSkipChecksum); + } + } catch (IOException e) { + LOG.trace(traceFormatStr + ": I/O error", + buf.remaining(), block, filename, canSkipChecksum, e); + throw e; + } + LOG.trace(traceFormatStr + ": returning {}", + buf.remaining(), block, filename, canSkipChecksum, nRead); + return nRead; + } finally { + if (canSkipChecksum) releaseNoChecksumContext(); + } + } + + private synchronized int readWithoutBounceBuffer(ByteBuffer buf) + throws IOException { + freeDataBufIfExists(); + freeChecksumBufIfExists(); + int total = 0; + while (buf.hasRemaining()) { + int nRead = dataIn.read(buf, dataPos); + if (nRead <= 0) break; + dataPos += nRead; + total += nRead; + } + return (total == 0 && (dataPos == dataIn.size())) ? -1 : total; + } + + /** + * Fill the data buffer. If necessary, validate the data against the + * checksums. + * + * We always want the offsets of the data contained in dataBuf to be + * aligned to the chunk boundary. If we are validating checksums, we + * accomplish this by seeking backwards in the file until we're on a + * chunk boundary. (This is necessary because we can't checksum a + * partial chunk.) If we are not validating checksums, we simply only + * fill the latter part of dataBuf. + * + * @param canSkipChecksum true if we can skip checksumming. + * @return true if we hit EOF. + * @throws IOException + */ + private synchronized boolean fillDataBuf(boolean canSkipChecksum) + throws IOException { + createDataBufIfNeeded(); + final int slop = (int)(dataPos % bytesPerChecksum); + final long oldDataPos = dataPos; + dataBuf.limit(maxReadaheadLength); + if (canSkipChecksum) { + dataBuf.position(slop); + fillBuffer(dataBuf, true); + } else { + dataPos -= slop; + dataBuf.position(0); + fillBuffer(dataBuf, false); + } + dataBuf.limit(dataBuf.position()); + dataBuf.position(Math.min(dataBuf.position(), slop)); + LOG.trace("loaded {} bytes into bounce buffer from offset {} of {}", + dataBuf.remaining(), oldDataPos, block); + return dataBuf.limit() != maxReadaheadLength; + } + + /** + * Read using the bounce buffer. + * + * A 'direct' read actually has three phases. The first drains any + * remaining bytes from the slow read buffer. After this the read is + * guaranteed to be on a checksum chunk boundary. If there are still bytes + * to read, the fast direct path is used for as many remaining bytes as + * possible, up to a multiple of the checksum chunk size. Finally, any + * 'odd' bytes remaining at the end of the read cause another slow read to + * be issued, which involves an extra copy. + * + * Every 'slow' read tries to fill the slow read buffer in one go for + * efficiency's sake. As described above, all non-checksum-chunk-aligned + * reads will be served from the slower read path. + * + * @param buf The buffer to read into. + * @param canSkipChecksum True if we can skip checksums. + */ + private synchronized int readWithBounceBuffer(ByteBuffer buf, + boolean canSkipChecksum) throws IOException { + int total = 0; + int bb = drainDataBuf(buf); // drain bounce buffer if possible + if (bb >= 0) { + total += bb; + if (buf.remaining() == 0) return total; + } + boolean eof = true, done = false; + do { + if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength) + && ((dataPos % bytesPerChecksum) == 0)) { + // Fast lane: try to read directly into user-supplied buffer, bypassing + // bounce buffer. + int oldLimit = buf.limit(); + int nRead; + try { + buf.limit(buf.position() + maxReadaheadLength); + nRead = fillBuffer(buf, canSkipChecksum); + } finally { + buf.limit(oldLimit); + } + if (nRead < maxReadaheadLength) { + done = true; + } + if (nRead > 0) { + eof = false; + } + total += nRead; + } else { + // Slow lane: refill bounce buffer. + if (fillDataBuf(canSkipChecksum)) { + done = true; + } + bb = drainDataBuf(buf); // drain bounce buffer if possible + if (bb >= 0) { + eof = false; + total += bb; + } + } + } while ((!done) && (buf.remaining() > 0)); + return (eof && total == 0) ? -1 : total; + } + + @Override + public synchronized int read(byte[] arr, int off, int len) + throws IOException { + boolean canSkipChecksum = createNoChecksumContext(); + int nRead; + try { + final String traceFormatStr = "read(arr.length={}, off={}, len={}, " + + "filename={}, block={}, canSkipChecksum={})"; + LOG.trace(traceFormatStr + ": starting", + arr.length, off, len, filename, block, canSkipChecksum); + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(arr, off, len); + } else { + nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum); + } + } catch (IOException e) { + LOG.trace(traceFormatStr + ": I/O error", + arr.length, off, len, filename, block, canSkipChecksum, e); + throw e; + } + LOG.trace(traceFormatStr + ": returning {}", + arr.length, off, len, filename, block, canSkipChecksum, nRead); + } finally { + if (canSkipChecksum) releaseNoChecksumContext(); + } + return nRead; + } + + private synchronized int readWithoutBounceBuffer(byte arr[], int off, + int len) throws IOException { + freeDataBufIfExists(); + freeChecksumBufIfExists(); + int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos); + if (nRead > 0) { + dataPos += nRead; + } else if ((nRead == 0) && (dataPos == dataIn.size())) { + return -1; + } + return nRead; + } + + private synchronized int readWithBounceBuffer(byte arr[], int off, int len, + boolean canSkipChecksum) throws IOException { + createDataBufIfNeeded(); + if (!dataBuf.hasRemaining()) { + dataBuf.position(0); + dataBuf.limit(maxReadaheadLength); + fillDataBuf(canSkipChecksum); + } + if (dataBuf.remaining() == 0) return -1; + int toRead = Math.min(dataBuf.remaining(), len); + dataBuf.get(arr, off, toRead); + return toRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + int discardedFromBuf = 0; + long remaining = n; + if ((dataBuf != null) && dataBuf.hasRemaining()) { + discardedFromBuf = (int)Math.min(dataBuf.remaining(), n); + dataBuf.position(dataBuf.position() + discardedFromBuf); + remaining -= discardedFromBuf; + } + LOG.trace("skip(n={}, block={}, filename={}): discarded {} bytes from " + + "dataBuf and advanced dataPos by {}", + n, block, filename, discardedFromBuf, remaining); + dataPos += remaining; + return n; + } + + @Override + public int available() { + // We never do network I/O in BlockReaderLocal. + return Integer.MAX_VALUE; + } + + @Override + public synchronized void close() throws IOException { + if (closed) return; + closed = true; + LOG.trace("close(filename={}, block={})", filename, block); + replica.unref(); + freeDataBufIfExists(); + freeChecksumBufIfExists(); + } + + @Override + public synchronized void readFully(byte[] arr, int off, int len) + throws IOException { + BlockReaderUtil.readFully(this, arr, off, len); + } + + @Override + public synchronized int readAll(byte[] buf, int off, int len) + throws IOException { + return BlockReaderUtil.readAll(this, buf, off, len); + } + + @Override + public boolean isShortCircuit() { + return true; + } + + /** + * Get or create a memory map for this replica. + * + * There are two kinds of ClientMmap objects we could fetch here: one that + * will always read pre-checksummed data, and one that may read data that + * hasn't been checksummed. + * + * If we fetch the former, "safe" kind of ClientMmap, we have to increment + * the anchor count on the shared memory slot. This will tell the DataNode + * not to munlock the block until this ClientMmap is closed. + * If we fetch the latter, we don't bother with anchoring. + * + * @param opts The options to use, such as SKIP_CHECKSUMS. + * + * @return null on failure; the ClientMmap otherwise. + */ + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + boolean anchor = verifyChecksum && + !opts.contains(ReadOption.SKIP_CHECKSUMS); + if (anchor) { + if (!createNoChecksumContext()) { + LOG.trace("can't get an mmap for {} of {} since SKIP_CHECKSUMS was not " + + "given, we aren't skipping checksums, and the block is not " + + "mlocked.", block, filename); + return null; + } + } + ClientMmap clientMmap = null; + try { + clientMmap = replica.getOrCreateClientMmap(anchor); + } finally { + if ((clientMmap == null) && anchor) { + releaseNoChecksumContext(); + } + } + return clientMmap; + } + + @VisibleForTesting + boolean getVerifyChecksum() { + return this.verifyChecksum; + } + + @VisibleForTesting + int getMaxReadaheadLength() { + return this.maxReadaheadLength; + } + + /** + * Make the replica anchorable. Normally this can only be done by the + * DataNode. This method is only for testing. + */ + @VisibleForTesting + void forceAnchorable() { + replica.getSlot().makeAnchorable(); + } + + /** + * Make the replica unanchorable. Normally this can only be done by the + * DataNode. This method is only for testing. + */ + @VisibleForTesting + void forceUnanchorable() { + replica.getSlot().makeUnanchorable(); + } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } + + @Override + public int getNetworkDistance() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java new file mode 100644 index 0000000..7d20a83 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java @@ -0,0 +1,745 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.hdfs.util.IOUtilsClient; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DirectBufferPool; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client + * is on the same machine as the datanode, then the client can read files + * directly from the local file system rather than going through the datanode + * for better performance. <br> + * + * This is the legacy implementation based on HDFS-2246, which requires + * permissions on the datanode to be set so that clients can directly access the + * blocks. The new implementation based on HDFS-347 should be preferred on UNIX + * systems where the required native code has been implemented.<br> + * + * {@link BlockReaderLocalLegacy} works as follows: + * <ul> + * <li>The client performing short circuit reads must be configured at the + * datanode.</li> + * <li>The client gets the path to the file where block is stored using + * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)} + * RPC call</li> + * <li>Client uses kerberos authentication to connect to the datanode over RPC, + * if security is enabled.</li> + * </ul> + */ [email protected] +class BlockReaderLocalLegacy implements BlockReader { + private static final Logger LOG = LoggerFactory.getLogger( + BlockReaderLocalLegacy.class); + + //Stores the cache and proxy for a local datanode. + private static class LocalDatanodeInfo { + private ClientDatanodeProtocol proxy = null; + private final Map<ExtendedBlock, BlockLocalPathInfo> cache; + + LocalDatanodeInfo() { + final int cacheSize = 10000; + final float hashTableLoadFactor = 0.75f; + int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + + 1; + cache = Collections + .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>( + hashTableCapacity, hashTableLoadFactor, true) { + private static final long serialVersionUID = 1; + + @Override + protected boolean removeEldestEntry( + Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) { + return size() > cacheSize; + } + }); + } + + private synchronized ClientDatanodeProtocol getDatanodeProxy( + UserGroupInformation ugi, final DatanodeInfo node, + final Configuration conf, final int socketTimeout, + final boolean connectToDnViaHostname) throws IOException { + if (proxy == null) { + try { + proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() { + @Override + public ClientDatanodeProtocol run() throws Exception { + return DFSUtilClient.createClientDatanodeProtocolProxy(node, conf, + socketTimeout, connectToDnViaHostname); + } + }); + } catch (InterruptedException e) { + LOG.warn("encountered exception ", e); + } + } + return proxy; + } + + private synchronized void resetDatanodeProxy() { + if (null != proxy) { + RPC.stopProxy(proxy); + proxy = null; + } + } + + private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { + return cache.get(b); + } + + private void setBlockLocalPathInfo(ExtendedBlock b, + BlockLocalPathInfo info) { + cache.put(b, info); + } + + private void removeBlockLocalPathInfo(ExtendedBlock b) { + cache.remove(b); + } + } + + // Multiple datanodes could be running on the local machine. Store proxies in + // a map keyed by the ipc port of the datanode. + private static final Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = + new HashMap<>(); + + private final FileInputStream dataIn; // reader for the data file + private final FileInputStream checksumIn; // reader for the checksum file + + /** + * Offset from the most recent chunk boundary at which the next read should + * take place. Is only set to non-zero at construction time, and is + * decremented (usually to 0) by subsequent reads. This avoids having to do a + * checksum read at construction to position the read cursor correctly. + */ + private int offsetFromChunkBoundary; + + private byte[] skipBuf = null; + + /** + * Used for checksummed reads that need to be staged before copying to their + * output buffer because they are either a) smaller than the checksum chunk + * size or b) issued by the slower read(byte[]...) path + */ + private ByteBuffer slowReadBuff = null; + private ByteBuffer checksumBuff = null; + private DataChecksum checksum; + private final boolean verifyChecksum; + + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + + private final int bytesPerChecksum; + private final int checksumSize; + + /** offset in block where reader wants to actually read */ + private long startOffset; + private final String filename; + private long blockId; + private final Tracer tracer; + + /** + * The only way this object can be instantiated. + */ + static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf, + UserGroupInformation userGroupInformation, + Configuration configuration, String file, ExtendedBlock blk, + Token<BlockTokenIdentifier> token, DatanodeInfo node, + long startOffset, long length, StorageType storageType, + Tracer tracer) throws IOException { + final ShortCircuitConf scConf = conf.getShortCircuitConf(); + LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node + .getIpcPort()); + // check the cache first + BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); + if (pathinfo == null) { + if (userGroupInformation == null) { + userGroupInformation = UserGroupInformation.getCurrentUser(); + } + pathinfo = getBlockPathInfo(userGroupInformation, blk, node, + configuration, conf.getSocketTimeout(), token, + conf.isConnectToDnViaHostname(), storageType); + } + + // check to see if the file exists. It may so happen that the + // HDFS file has been deleted and this block-lookup is occurring + // on behalf of a new HDFS file. This time, the block file could + // be residing in a different portion of the fs.data.dir directory. + // In this case, we remove this entry from the cache. The next + // call to this method will re-populate the cache. + FileInputStream dataIn = null; + FileInputStream checksumIn = null; + BlockReaderLocalLegacy localBlockReader = null; + final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums() + || storageType.isTransient(); + try { + // get a local file system + File blkfile = new File(pathinfo.getBlockPath()); + dataIn = new FileInputStream(blkfile); + + LOG.debug("New BlockReaderLocalLegacy for file {} of size {} startOffset " + + "{} length {} short circuit checksum {}", + blkfile, blkfile.length(), startOffset, length, !skipChecksumCheck); + + if (!skipChecksumCheck) { + // get the metadata file + File metafile = new File(pathinfo.getMetaPath()); + checksumIn = new FileInputStream(metafile); + + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( + new DataInputStream(checksumIn), blk); + long firstChunkOffset = startOffset + - (startOffset % checksum.getBytesPerChecksum()); + localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, + startOffset, checksum, true, dataIn, firstChunkOffset, checksumIn, + tracer); + } else { + localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, + startOffset, dataIn, tracer); + } + } catch (IOException e) { + // remove from cache + localDatanodeInfo.removeBlockLocalPathInfo(blk); + LOG.warn("BlockReaderLocalLegacy: Removing " + blk + + " from cache because local file " + pathinfo.getBlockPath() + + " could not be opened."); + throw e; + } finally { + if (localBlockReader == null) { + if (dataIn != null) { + dataIn.close(); + } + if (checksumIn != null) { + checksumIn.close(); + } + } + } + return localBlockReader; + } + + private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) { + LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port); + if (ldInfo == null) { + ldInfo = new LocalDatanodeInfo(); + localDatanodeInfoMap.put(port, ldInfo); + } + return ldInfo; + } + + private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi, + ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout, + Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname, + StorageType storageType) throws IOException { + LocalDatanodeInfo localDatanodeInfo = + getLocalDatanodeInfo(node.getIpcPort()); + BlockLocalPathInfo pathinfo; + ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node, + conf, timeout, connectToDnViaHostname); + try { + // make RPC to local datanode to find local pathnames of blocks + pathinfo = proxy.getBlockLocalPathInfo(blk, token); + // We can't cache the path information for a replica on transient storage. + // If the replica gets evicted, then it moves to a different path. Then, + // our next attempt to read from the cached path would fail to find the + // file. Additionally, the failure would cause us to disable legacy + // short-circuit read for all subsequent use in the ClientContext. Unlike + // the newer short-circuit read implementation, we have no communication + // channel for the DataNode to notify the client that the path has been + // invalidated. Therefore, our only option is to skip caching. + if (pathinfo != null && !storageType.isTransient()) { + LOG.debug("Cached location of block {} as {}", blk, pathinfo); + localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo); + } + } catch (IOException e) { + localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error + throw e; + } + return pathinfo; + } + + private static int getSlowReadBufferNumChunks(int bufferSizeBytes, + int bytesPerChecksum) { + if (bufferSizeBytes < bytesPerChecksum) { + throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " + + "buffer size (" + bufferSizeBytes + ") is not large enough to hold " + + "a single chunk (" + bytesPerChecksum + "). Please configure " + + HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY + + " appropriately"); + } + + // Round down to nearest chunk size + return bufferSizeBytes / bytesPerChecksum; + } + + private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, + ExtendedBlock block, long startOffset, FileInputStream dataIn, + Tracer tracer) throws IOException { + this(conf, hdfsfile, block, startOffset, + DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false, + dataIn, startOffset, null, tracer); + } + + private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, + ExtendedBlock block, long startOffset, DataChecksum checksum, + boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, + FileInputStream checksumIn, Tracer tracer) throws IOException { + this.filename = hdfsfile; + this.checksum = checksum; + this.verifyChecksum = verifyChecksum; + this.startOffset = Math.max(startOffset, 0); + this.blockId = block.getBlockId(); + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + + this.dataIn = dataIn; + this.checksumIn = checksumIn; + this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); + + final int chunksPerChecksumRead = getSlowReadBufferNumChunks( + conf.getShortCircuitBufferSize(), bytesPerChecksum); + slowReadBuff = bufferPool.getBuffer( + bytesPerChecksum * chunksPerChecksumRead); + checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); + // Initially the buffers have nothing to read. + slowReadBuff.flip(); + checksumBuff.flip(); + boolean success = false; + try { + // Skip both input streams to beginning of the chunk containing + // startOffset + IOUtils.skipFully(dataIn, firstChunkOffset); + if (checksumIn != null) { + long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * + checksumSize; + IOUtils.skipFully(checksumIn, checkSumOffset); + } + success = true; + } finally { + if (!success) { + bufferPool.returnBuffer(slowReadBuff); + bufferPool.returnBuffer(checksumBuff); + } + } + this.tracer = tracer; + } + + /** + * Reads bytes into a buffer until EOF or the buffer's limit is reached + */ + private int fillBuffer(FileInputStream stream, ByteBuffer buf) + throws IOException { + try (TraceScope ignored = tracer. + newScope("BlockReaderLocalLegacy#fillBuffer(" + blockId + ")")) { + int bytesRead = stream.getChannel().read(buf); + if (bytesRead < 0) { + //EOF + return bytesRead; + } + while (buf.remaining() > 0) { + int n = stream.getChannel().read(buf); + if (n < 0) { + //EOF + return bytesRead; + } + bytesRead += n; + } + return bytesRead; + } + } + + /** + * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into + * another. + */ + private void writeSlice(ByteBuffer from, ByteBuffer to, int length) { + int oldLimit = from.limit(); + from.limit(from.position() + length); + try { + to.put(from); + } finally { + from.limit(oldLimit); + } + } + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + int nRead = 0; + if (verifyChecksum) { + // A 'direct' read actually has three phases. The first drains any + // remaining bytes from the slow read buffer. After this the read is + // guaranteed to be on a checksum chunk boundary. If there are still bytes + // to read, the fast direct path is used for as many remaining bytes as + // possible, up to a multiple of the checksum chunk size. Finally, any + // 'odd' bytes remaining at the end of the read cause another slow read to + // be issued, which involves an extra copy. + + // Every 'slow' read tries to fill the slow read buffer in one go for + // efficiency's sake. As described above, all non-checksum-chunk-aligned + // reads will be served from the slower read path. + + if (slowReadBuff.hasRemaining()) { + // There are remaining bytes from a small read available. This usually + // means this read is unaligned, which falls back to the slow path. + int fromSlowReadBuff = Math.min(buf.remaining(), + slowReadBuff.remaining()); + writeSlice(slowReadBuff, buf, fromSlowReadBuff); + nRead += fromSlowReadBuff; + } + + if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) { + // Since we have drained the 'small read' buffer, we are guaranteed to + // be chunk-aligned + int len = buf.remaining() - (buf.remaining() % bytesPerChecksum); + + // There's only enough checksum buffer space available to checksum one + // entire slow read buffer. This saves keeping the number of checksum + // chunks around. + len = Math.min(len, slowReadBuff.capacity()); + int oldlimit = buf.limit(); + buf.limit(buf.position() + len); + int readResult = 0; + try { + readResult = doByteBufferRead(buf); + } finally { + buf.limit(oldlimit); + } + if (readResult == -1) { + return nRead; + } else { + nRead += readResult; + buf.position(buf.position() + readResult); + } + } + + // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read + // until chunk boundary + if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || + offsetFromChunkBoundary > 0) { + int toRead = Math.min(buf.remaining(), + bytesPerChecksum - offsetFromChunkBoundary); + int readResult = fillSlowReadBuffer(toRead); + if (readResult == -1) { + return nRead; + } else { + int fromSlowReadBuff = Math.min(readResult, buf.remaining()); + writeSlice(slowReadBuff, buf, fromSlowReadBuff); + nRead += fromSlowReadBuff; + } + } + } else { + // Non-checksummed reads are much easier; we can just fill the buffer + // directly. + nRead = doByteBufferRead(buf); + if (nRead > 0) { + buf.position(buf.position() + nRead); + } + } + return nRead; + } + + /** + * Tries to read as many bytes as possible into supplied buffer, checksumming + * each chunk if needed. + * + * <b>Preconditions:</b> + * <ul> + * <li> + * If checksumming is enabled, buf.remaining must be a multiple of + * bytesPerChecksum. Note that this is not a requirement for clients of + * read(ByteBuffer) - in the case of non-checksum-sized read requests, + * read(ByteBuffer) will substitute a suitably sized buffer to pass to this + * method. + * </li> + * </ul> + * <b>Postconditions:</b> + * <ul> + * <li>buf.limit and buf.mark are unchanged.</li> + * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the + * requested bytes can be read straight from the buffer</li> + * </ul> + * + * @param buf + * byte buffer to write bytes to. If checksums are not required, buf + * can have any number of bytes remaining, otherwise there must be a + * multiple of the checksum chunk size remaining. + * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt> + * that is, the the number of useful bytes (up to the amount + * requested) readable from the buffer by the client. + */ + private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException { + if (verifyChecksum) { + assert buf.remaining() % bytesPerChecksum == 0; + } + int dataRead; + + int oldpos = buf.position(); + // Read as much as we can into the buffer. + dataRead = fillBuffer(dataIn, buf); + + if (dataRead == -1) { + return -1; + } + + if (verifyChecksum) { + ByteBuffer toChecksum = buf.duplicate(); + toChecksum.position(oldpos); + toChecksum.limit(oldpos + dataRead); + + checksumBuff.clear(); + // Equivalent to + // (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum ); + int numChunks = + (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuff.limit(checksumSize * numChunks); + + fillBuffer(checksumIn, checksumBuff); + checksumBuff.flip(); + + checksum.verifyChunkedSums(toChecksum, checksumBuff, filename, + this.startOffset); + } + + if (dataRead >= 0) { + buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead)); + } + + if (dataRead < offsetFromChunkBoundary) { + // yikes, didn't even get enough bytes to honour offset. This can happen + // even if we are verifying checksums if we are at EOF. + offsetFromChunkBoundary -= dataRead; + dataRead = 0; + } else { + dataRead -= offsetFromChunkBoundary; + offsetFromChunkBoundary = 0; + } + + return dataRead; + } + + /** + * Ensures that up to len bytes are available and checksummed in the slow read + * buffer. The number of bytes available to read is returned. If the buffer is + * not already empty, the number of remaining bytes is returned and no actual + * read happens. + * + * @param len + * the maximum number of bytes to make available. After len bytes + * are read, the underlying bytestream <b>must</b> be at a checksum + * boundary, or EOF. That is, (len + currentPosition) % + * bytesPerChecksum == 0. + * @return the number of bytes available to read, or -1 if EOF. + */ + private synchronized int fillSlowReadBuffer(int len) throws IOException { + int nRead; + if (slowReadBuff.hasRemaining()) { + // Already got data, good to go. + nRead = Math.min(len, slowReadBuff.remaining()); + } else { + // Round a complete read of len bytes (plus any implicit offset) to the + // next chunk boundary, since we try and read in multiples of a chunk + int nextChunk = len + offsetFromChunkBoundary + + (bytesPerChecksum - + ((len + offsetFromChunkBoundary) % bytesPerChecksum)); + int limit = Math.min(nextChunk, slowReadBuff.capacity()); + assert limit % bytesPerChecksum == 0; + + slowReadBuff.clear(); + slowReadBuff.limit(limit); + + nRead = doByteBufferRead(slowReadBuff); + + if (nRead > 0) { + // So that next time we call slowReadBuff.hasRemaining(), we don't get a + // false positive. + slowReadBuff.limit(nRead + slowReadBuff.position()); + } + } + return nRead; + } + + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + LOG.trace("read off {} len {}", off, len); + if (!verifyChecksum) { + return dataIn.read(buf, off, len); + } + + int nRead = fillSlowReadBuffer(slowReadBuff.capacity()); + + if (nRead > 0) { + // Possible that buffer is filled with a larger read than we need, since + // we tried to read as much as possible at once + nRead = Math.min(len, nRead); + slowReadBuff.get(buf, off, nRead); + } + + return nRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + LOG.debug("skip {}", n); + if (n <= 0) { + return 0; + } + if (!verifyChecksum) { + return dataIn.skip(n); + } + + // caller made sure newPosition is not beyond EOF. + int remaining = slowReadBuff.remaining(); + int position = slowReadBuff.position(); + int newPosition = position + (int)n; + + // if the new offset is already read into dataBuff, just reposition + if (n <= remaining) { + assert offsetFromChunkBoundary == 0; + slowReadBuff.position(newPosition); + return n; + } + + // for small gap, read through to keep the data/checksum in sync + if (n - remaining <= bytesPerChecksum) { + slowReadBuff.position(position + remaining); + if (skipBuf == null) { + skipBuf = new byte[bytesPerChecksum]; + } + int ret = read(skipBuf, 0, (int)(n - remaining)); + return (remaining + ret); + } + + // optimize for big gap: discard the current buffer, skip to + // the beginning of the appropriate checksum chunk and then + // read to the middle of that chunk to be in sync with checksums. + + // We can't use this.offsetFromChunkBoundary because we need to know how + // many bytes of the offset were really read. Calling read(..) with a + // positive this.offsetFromChunkBoundary causes that many bytes to get + // silently skipped. + int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum; + long toskip = n - remaining - myOffsetFromChunkBoundary; + + slowReadBuff.position(slowReadBuff.limit()); + checksumBuff.position(checksumBuff.limit()); + + IOUtils.skipFully(dataIn, toskip); + long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize; + IOUtils.skipFully(checksumIn, checkSumOffset); + + // read into the middle of the chunk + if (skipBuf == null) { + skipBuf = new byte[bytesPerChecksum]; + } + assert skipBuf.length == bytesPerChecksum; + assert myOffsetFromChunkBoundary < bytesPerChecksum; + + int ret = read(skipBuf, 0, myOffsetFromChunkBoundary); + + if (ret == -1) { // EOS + return (toskip + remaining); + } else { + return (toskip + remaining + ret); + } + } + + @Override + public synchronized void close() throws IOException { + IOUtilsClient.cleanup(LOG, dataIn, checksumIn); + if (slowReadBuff != null) { + bufferPool.returnBuffer(slowReadBuff); + slowReadBuff = null; + } + if (checksumBuff != null) { + bufferPool.returnBuffer(checksumBuff); + checksumBuff = null; + } + startOffset = -1; + checksum = null; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); + } + + @Override + public int available() { + // We never do network I/O in BlockReaderLocalLegacy. + return Integer.MAX_VALUE; + } + + @Override + public boolean isShortCircuit() { + return true; + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + return null; + } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } + + @Override + public int getNetworkDistance() { + return 0; + } +}
