HDFS-8057 Move BlockReader implementation to the client implementation package. Contributed by Takanobu Asanuma
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e5dab680 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e5dab680 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e5dab680 Branch: refs/heads/branch-2 Commit: e5dab68066d73811cd6306603f24ceddf8c603f7 Parents: 0ac8c09 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Wed May 25 12:12:27 2016 -0700 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Wed May 25 12:12:27 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hdfs/BlockReaderFactory.java | 870 ------------------ .../apache/hadoop/hdfs/BlockReaderLocal.java | 719 --------------- .../hadoop/hdfs/BlockReaderLocalLegacy.java | 739 ---------------- .../org/apache/hadoop/hdfs/BlockReaderUtil.java | 57 -- .../org/apache/hadoop/hdfs/DFSInputStream.java | 1 + .../apache/hadoop/hdfs/ExternalBlockReader.java | 126 --- .../apache/hadoop/hdfs/RemoteBlockReader.java | 505 ----------- .../apache/hadoop/hdfs/RemoteBlockReader2.java | 467 ---------- .../hdfs/client/impl/BlockReaderFactory.java | 878 +++++++++++++++++++ .../hdfs/client/impl/BlockReaderLocal.java | 720 +++++++++++++++ .../client/impl/BlockReaderLocalLegacy.java | 740 ++++++++++++++++ .../hdfs/client/impl/BlockReaderRemote.java | 507 +++++++++++ .../hdfs/client/impl/BlockReaderRemote2.java | 469 ++++++++++ .../hdfs/client/impl/BlockReaderUtil.java | 58 ++ .../hdfs/client/impl/ExternalBlockReader.java | 128 +++ .../dev-support/findbugsExcludeFile.xml | 2 +- .../hdfs/server/namenode/NamenodeFsck.java | 2 +- .../hadoop/fs/TestEnhancedByteBufferAccess.java | 3 +- .../apache/hadoop/hdfs/BlockReaderTestUtil.java | 259 ------ .../apache/hadoop/hdfs/TestBlockReaderBase.java | 94 -- .../hadoop/hdfs/TestBlockReaderFactory.java | 534 ----------- .../hadoop/hdfs/TestBlockReaderLocal.java | 780 ---------------- .../hadoop/hdfs/TestBlockReaderLocalLegacy.java | 220 ----- .../hdfs/TestClientBlockVerification.java | 125 --- .../org/apache/hadoop/hdfs/TestConnCache.java | 1 + .../hadoop/hdfs/TestDisableConnCache.java | 1 + .../hadoop/hdfs/TestParallelReadUtil.java | 1 + .../hadoop/hdfs/TestRemoteBlockReader.java | 29 - .../hadoop/hdfs/TestRemoteBlockReader2.java | 25 - .../hdfs/client/impl/BlockReaderTestUtil.java | 268 ++++++ .../hdfs/client/impl/TestBlockReaderBase.java | 97 ++ .../client/impl/TestBlockReaderFactory.java | 539 ++++++++++++ .../hdfs/client/impl/TestBlockReaderLocal.java | 786 +++++++++++++++++ .../client/impl/TestBlockReaderLocalLegacy.java | 227 +++++ .../hdfs/client/impl/TestBlockReaderRemote.java | 30 + .../client/impl/TestBlockReaderRemote2.java | 27 + .../impl/TestClientBlockVerification.java | 126 +++ .../blockmanagement/TestBlockTokenWithDFS.java | 2 +- .../datanode/TestDataNodeVolumeFailure.java | 2 +- .../server/datanode/TestFsDatasetCache.java | 3 +- .../datanode/TestFsDatasetCacheRevocation.java | 2 +- .../server/namenode/TestCacheDirectives.java | 3 +- .../shortcircuit/TestShortCircuitCache.java | 4 +- .../shortcircuit/TestShortCircuitLocalRead.java | 3 +- 44 files changed, 5615 insertions(+), 5564 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java deleted file mode 100644 index 7af4609..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ /dev/null @@ -1,870 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; - -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.net.InetSocketAddress; -import java.util.List; - -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; -import org.apache.commons.lang.mutable.MutableBoolean; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; -import org.apache.hadoop.hdfs.net.DomainPeer; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; -import org.apache.hadoop.hdfs.util.IOUtilsClient; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.PerformanceAdvisory; -import org.apache.hadoop.util.Time; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.htrace.core.Tracer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Utility class to create BlockReader implementations. - */ [email protected] -public class BlockReaderFactory implements ShortCircuitReplicaCreator { - static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class); - - public static class FailureInjector { - public void injectRequestFileDescriptorsFailure() throws IOException { - // do nothing - } - public boolean getSupportsReceiptVerification() { - return true; - } - } - - @VisibleForTesting - static ShortCircuitReplicaCreator - createShortCircuitReplicaInfoCallback = null; - - private final DfsClientConf conf; - - /** - * Injects failures into specific operations during unit tests. - */ - private static FailureInjector failureInjector = new FailureInjector(); - - /** - * The file name, for logging and debugging purposes. - */ - private String fileName; - - /** - * The block ID and block pool ID to use. - */ - private ExtendedBlock block; - - /** - * The block token to use for security purposes. - */ - private Token<BlockTokenIdentifier> token; - - /** - * The offset within the block to start reading at. - */ - private long startOffset; - - /** - * If false, we won't try to verify the block checksum. - */ - private boolean verifyChecksum; - - /** - * The name of this client. - */ - private String clientName; - - /** - * The DataNode we're talking to. - */ - private DatanodeInfo datanode; - - /** - * StorageType of replica on DataNode. - */ - private StorageType storageType; - - /** - * If false, we won't try short-circuit local reads. - */ - private boolean allowShortCircuitLocalReads; - - /** - * The ClientContext to use for things like the PeerCache. - */ - private ClientContext clientContext; - - /** - * Number of bytes to read. Must be set to a non-negative value. - */ - private long length = -1; - - /** - * Caching strategy to use when reading the block. - */ - private CachingStrategy cachingStrategy; - - /** - * Socket address to use to connect to peer. - */ - private InetSocketAddress inetSocketAddress; - - /** - * Remote peer factory to use to create a peer, if needed. - */ - private RemotePeerFactory remotePeerFactory; - - /** - * UserGroupInformation to use for legacy block reader local objects, - * if needed. - */ - private UserGroupInformation userGroupInformation; - - /** - * Configuration to use for legacy block reader local objects, if needed. - */ - private Configuration configuration; - - /** - * The HTrace tracer to use. - */ - private Tracer tracer; - - /** - * Information about the domain socket path we should use to connect to the - * local peer-- or null if we haven't examined the local domain socket. - */ - private DomainSocketFactory.PathInfo pathInfo; - - /** - * The remaining number of times that we'll try to pull a socket out of the - * cache. - */ - private int remainingCacheTries; - - public BlockReaderFactory(DfsClientConf conf) { - this.conf = conf; - this.remainingCacheTries = conf.getNumCachedConnRetry(); - } - - public BlockReaderFactory setFileName(String fileName) { - this.fileName = fileName; - return this; - } - - public BlockReaderFactory setBlock(ExtendedBlock block) { - this.block = block; - return this; - } - - public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) { - this.token = token; - return this; - } - - public BlockReaderFactory setStartOffset(long startOffset) { - this.startOffset = startOffset; - return this; - } - - public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) { - this.verifyChecksum = verifyChecksum; - return this; - } - - public BlockReaderFactory setClientName(String clientName) { - this.clientName = clientName; - return this; - } - - public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) { - this.datanode = datanode; - return this; - } - - public BlockReaderFactory setStorageType(StorageType storageType) { - this.storageType = storageType; - return this; - } - - public BlockReaderFactory setAllowShortCircuitLocalReads( - boolean allowShortCircuitLocalReads) { - this.allowShortCircuitLocalReads = allowShortCircuitLocalReads; - return this; - } - - public BlockReaderFactory setClientCacheContext( - ClientContext clientContext) { - this.clientContext = clientContext; - return this; - } - - public BlockReaderFactory setLength(long length) { - this.length = length; - return this; - } - - public BlockReaderFactory setCachingStrategy( - CachingStrategy cachingStrategy) { - this.cachingStrategy = cachingStrategy; - return this; - } - - public BlockReaderFactory setInetSocketAddress ( - InetSocketAddress inetSocketAddress) { - this.inetSocketAddress = inetSocketAddress; - return this; - } - - public BlockReaderFactory setUserGroupInformation( - UserGroupInformation userGroupInformation) { - this.userGroupInformation = userGroupInformation; - return this; - } - - public BlockReaderFactory setRemotePeerFactory( - RemotePeerFactory remotePeerFactory) { - this.remotePeerFactory = remotePeerFactory; - return this; - } - - public BlockReaderFactory setConfiguration( - Configuration configuration) { - this.configuration = configuration; - return this; - } - - public BlockReaderFactory setTracer(Tracer tracer) { - this.tracer = tracer; - return this; - } - - @VisibleForTesting - public static void setFailureInjectorForTesting(FailureInjector injector) { - failureInjector = injector; - } - - /** - * Build a BlockReader with the given options. - * - * This function will do the best it can to create a block reader that meets - * all of our requirements. We prefer short-circuit block readers - * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the - * former avoid the overhead of socket communication. If short-circuit is - * unavailable, our next fallback is data transfer over UNIX domain sockets, - * if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't - * work, we will try to create a remote block reader that operates over TCP - * sockets. - * - * There are a few caches that are important here. - * - * The ShortCircuitCache stores file descriptor objects which have been passed - * from the DataNode. - * - * The DomainSocketFactory stores information about UNIX domain socket paths - * that we not been able to use in the past, so that we don't waste time - * retrying them over and over. (Like all the caches, it does have a timeout, - * though.) - * - * The PeerCache stores peers that we have used in the past. If we can reuse - * one of these peers, we avoid the overhead of re-opening a socket. However, - * if the socket has been timed out on the remote end, our attempt to reuse - * the socket may end with an IOException. For that reason, we limit our - * attempts at socket reuse to dfs.client.cached.conn.retry times. After - * that, we create new sockets. This avoids the problem where a thread tries - * to talk to a peer that it hasn't talked to in a while, and has to clean out - * every entry in a socket cache full of stale entries. - * - * @return The new BlockReader. We will not return null. - * - * @throws InvalidToken - * If the block token was invalid. - * InvalidEncryptionKeyException - * If the encryption key was invalid. - * Other IOException - * If there was another problem. - */ - public BlockReader build() throws IOException { - Preconditions.checkNotNull(configuration); - Preconditions - .checkState(length >= 0, "Length must be set to a non-negative value"); - BlockReader reader = tryToCreateExternalBlockReader(); - if (reader != null) { - return reader; - } - final ShortCircuitConf scConf = conf.getShortCircuitConf(); - if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) { - if (clientContext.getUseLegacyBlockReaderLocal()) { - reader = getLegacyBlockReaderLocal(); - if (reader != null) { - LOG.trace("{}: returning new legacy block reader local.", this); - return reader; - } - } else { - reader = getBlockReaderLocal(); - if (reader != null) { - LOG.trace("{}: returning new block reader local.", this); - return reader; - } - } - } - if (scConf.isDomainSocketDataTraffic()) { - reader = getRemoteBlockReaderFromDomain(); - if (reader != null) { - LOG.trace("{}: returning new remote block reader using UNIX domain " - + "socket on {}", this, pathInfo.getPath()); - return reader; - } - } - Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting, - "TCP reads were disabled for testing, but we failed to " + - "do a non-TCP read."); - return getRemoteBlockReaderFromTcp(); - } - - private BlockReader tryToCreateExternalBlockReader() { - List<Class<? extends ReplicaAccessorBuilder>> clses = - conf.getReplicaAccessorBuilderClasses(); - for (Class<? extends ReplicaAccessorBuilder> cls : clses) { - try { - ByteArrayDataOutput bado = ByteStreams.newDataOutput(); - token.write(bado); - byte tokenBytes[] = bado.toByteArray(); - - Constructor<? extends ReplicaAccessorBuilder> ctor = - cls.getConstructor(); - ReplicaAccessorBuilder builder = ctor.newInstance(); - long visibleLength = startOffset + length; - ReplicaAccessor accessor = builder. - setAllowShortCircuitReads(allowShortCircuitLocalReads). - setBlock(block.getBlockId(), block.getBlockPoolId()). - setGenerationStamp(block.getGenerationStamp()). - setBlockAccessToken(tokenBytes). - setClientName(clientName). - setConfiguration(configuration). - setFileName(fileName). - setVerifyChecksum(verifyChecksum). - setVisibleLength(visibleLength). - build(); - if (accessor == null) { - LOG.trace("{}: No ReplicaAccessor created by {}", - this, cls.getName()); - } else { - return new ExternalBlockReader(accessor, visibleLength, startOffset); - } - } catch (Throwable t) { - LOG.warn("Failed to construct new object of type " + - cls.getName(), t); - } - } - return null; - } - - - /** - * Get {@link BlockReaderLocalLegacy} for short circuited local reads. - * This block reader implements the path-based style of local reads - * first introduced in HDFS-2246. - */ - private BlockReader getLegacyBlockReaderLocal() throws IOException { - LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this); - if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) { - LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address" - + "{} is not local", this, inetSocketAddress); - return null; - } - if (clientContext.getDisableLegacyBlockReaderLocal()) { - PerformanceAdvisory.LOG.debug("{}: can't construct " + - "BlockReaderLocalLegacy because " + - "disableLegacyBlockReaderLocal is set.", this); - return null; - } - IOException ioe; - try { - return BlockReaderLocalLegacy.newBlockReader(conf, - userGroupInformation, configuration, fileName, block, token, - datanode, startOffset, length, storageType, tracer); - } catch (RemoteException remoteException) { - ioe = remoteException.unwrapRemoteException( - InvalidToken.class, AccessControlException.class); - } catch (IOException e) { - ioe = e; - } - if ((!(ioe instanceof AccessControlException)) && - isSecurityException(ioe)) { - // Handle security exceptions. - // We do not handle AccessControlException here, since - // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate - // that the user is not in dfs.block.local-path-access.user, a condition - // which requires us to disable legacy SCR. - throw ioe; - } - LOG.warn(this + ": error creating legacy BlockReaderLocal. " + - "Disabling legacy local reads.", ioe); - clientContext.setDisableLegacyBlockReaderLocal(); - return null; - } - - private BlockReader getBlockReaderLocal() throws InvalidToken { - LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit " - + " reads.", this); - if (pathInfo == null) { - pathInfo = clientContext.getDomainSocketFactory() - .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); - } - if (!pathInfo.getPathState().getUsableForShortCircuit()) { - PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " + - "giving up on BlockReaderLocal.", this, pathInfo); - return null; - } - ShortCircuitCache cache = clientContext.getShortCircuitCache(); - ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), - block.getBlockPoolId()); - ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); - InvalidToken exc = info.getInvalidTokenException(); - if (exc != null) { - LOG.trace("{}: got InvalidToken exception while trying to construct " - + "BlockReaderLocal via {}", this, pathInfo.getPath()); - throw exc; - } - if (info.getReplica() == null) { - PerformanceAdvisory.LOG.debug("{}: failed to get " + - "ShortCircuitReplica. Cannot construct " + - "BlockReaderLocal via {}", this, pathInfo.getPath()); - return null; - } - return new BlockReaderLocal.Builder(conf.getShortCircuitConf()). - setFilename(fileName). - setBlock(block). - setStartOffset(startOffset). - setShortCircuitReplica(info.getReplica()). - setVerifyChecksum(verifyChecksum). - setCachingStrategy(cachingStrategy). - setStorageType(storageType). - setTracer(tracer). - build(); - } - - /** - * Fetch a pair of short-circuit block descriptors from a local DataNode. - * - * @return Null if we could not communicate with the datanode, - * a new ShortCircuitReplicaInfo object otherwise. - * ShortCircuitReplicaInfo objects may contain either an - * InvalidToken exception, or a ShortCircuitReplica object ready to - * use. - */ - @Override - public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { - if (createShortCircuitReplicaInfoCallback != null) { - ShortCircuitReplicaInfo info = - createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo(); - if (info != null) return info; - } - LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this); - BlockReaderPeer curPeer; - while (true) { - curPeer = nextDomainPeer(); - if (curPeer == null) break; - if (curPeer.fromCache) remainingCacheTries--; - DomainPeer peer = (DomainPeer)curPeer.peer; - Slot slot = null; - ShortCircuitCache cache = clientContext.getShortCircuitCache(); - try { - MutableBoolean usedPeer = new MutableBoolean(false); - slot = cache.allocShmSlot(datanode, peer, usedPeer, - new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()), - clientName); - if (usedPeer.booleanValue()) { - LOG.trace("{}: allocShmSlot used up our previous socket {}. " - + "Allocating a new one...", this, peer.getDomainSocket()); - curPeer = nextDomainPeer(); - if (curPeer == null) break; - peer = (DomainPeer)curPeer.peer; - } - ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot); - clientContext.getPeerCache().put(datanode, peer); - return info; - } catch (IOException e) { - if (slot != null) { - cache.freeSlot(slot); - } - if (curPeer.fromCache) { - // Handle an I/O error we got when using a cached socket. - // These are considered less serious, because the socket may be stale. - LOG.debug("{}: closing stale domain peer {}", this, peer, e); - IOUtilsClient.cleanup(LOG, peer); - } else { - // Handle an I/O error we got when using a newly created socket. - // We temporarily disable the domain socket path for a few minutes in - // this case, to prevent wasting more time on it. - LOG.warn(this + ": I/O error requesting file descriptors. " + - "Disabling domain socket " + peer.getDomainSocket(), e); - IOUtilsClient.cleanup(LOG, peer); - clientContext.getDomainSocketFactory() - .disableDomainSocketPath(pathInfo.getPath()); - return null; - } - } - } - return null; - } - - /** - * Request file descriptors from a DomainPeer. - * - * @param peer The peer to use for communication. - * @param slot If non-null, the shared memory slot to associate with the - * new ShortCircuitReplica. - * - * @return A ShortCircuitReplica object if we could communicate with the - * datanode; null, otherwise. - * @throws IOException If we encountered an I/O exception while communicating - * with the datanode. - */ - private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, - Slot slot) throws IOException { - ShortCircuitCache cache = clientContext.getShortCircuitCache(); - final DataOutputStream out = - new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); - SlotId slotId = slot == null ? null : slot.getSlotId(); - new Sender(out).requestShortCircuitFds(block, token, slotId, 1, - failureInjector.getSupportsReceiptVerification()); - DataInputStream in = new DataInputStream(peer.getInputStream()); - BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); - DomainSocket sock = peer.getDomainSocket(); - failureInjector.injectRequestFileDescriptorsFailure(); - switch (resp.getStatus()) { - case SUCCESS: - byte buf[] = new byte[1]; - FileInputStream[] fis = new FileInputStream[2]; - sock.recvFileInputStreams(fis, buf, 0, buf.length); - ShortCircuitReplica replica = null; - try { - ExtendedBlockId key = - new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); - if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { - LOG.trace("Sending receipt verification byte for slot {}", slot); - sock.getOutputStream().write(0); - } - replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, - Time.monotonicNow(), slot); - return new ShortCircuitReplicaInfo(replica); - } catch (IOException e) { - // This indicates an error reading from disk, or a format error. Since - // it's not a socket communication problem, we return null rather than - // throwing an exception. - LOG.warn(this + ": error creating ShortCircuitReplica.", e); - return null; - } finally { - if (replica == null) { - IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]); - } - } - case ERROR_UNSUPPORTED: - if (!resp.hasShortCircuitAccessVersion()) { - LOG.warn("short-circuit read access is disabled for " + - "DataNode " + datanode + ". reason: " + resp.getMessage()); - clientContext.getDomainSocketFactory() - .disableShortCircuitForPath(pathInfo.getPath()); - } else { - LOG.warn("short-circuit read access for the file " + - fileName + " is disabled for DataNode " + datanode + - ". reason: " + resp.getMessage()); - } - return null; - case ERROR_ACCESS_TOKEN: - String msg = "access control error while " + - "attempting to set up short-circuit access to " + - fileName + resp.getMessage(); - LOG.debug("{}:{}", this, msg); - return new ShortCircuitReplicaInfo(new InvalidToken(msg)); - default: - LOG.warn(this + ": unknown response code " + resp.getStatus() + - " while attempting to set up short-circuit access. " + - resp.getMessage()); - clientContext.getDomainSocketFactory() - .disableShortCircuitForPath(pathInfo.getPath()); - return null; - } - } - - /** - * Get a RemoteBlockReader that communicates over a UNIX domain socket. - * - * @return The new BlockReader, or null if we failed to create the block - * reader. - * - * @throws InvalidToken If the block token was invalid. - * Potentially other security-related execptions. - */ - private BlockReader getRemoteBlockReaderFromDomain() throws IOException { - if (pathInfo == null) { - pathInfo = clientContext.getDomainSocketFactory() - .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); - } - if (!pathInfo.getPathState().getUsableForDataTransfer()) { - PerformanceAdvisory.LOG.debug("{}: not trying to create a " + - "remote block reader because the UNIX domain socket at {}" + - " is not usable.", this, pathInfo); - return null; - } - LOG.trace("{}: trying to create a remote block reader from the UNIX domain " - + "socket at {}", this, pathInfo.getPath()); - - while (true) { - BlockReaderPeer curPeer = nextDomainPeer(); - if (curPeer == null) break; - if (curPeer.fromCache) remainingCacheTries--; - DomainPeer peer = (DomainPeer)curPeer.peer; - BlockReader blockReader = null; - try { - blockReader = getRemoteBlockReader(peer); - return blockReader; - } catch (IOException ioe) { - IOUtilsClient.cleanup(LOG, peer); - if (isSecurityException(ioe)) { - LOG.trace("{}: got security exception while constructing a remote " - + " block reader from the unix domain socket at {}", - this, pathInfo.getPath(), ioe); - throw ioe; - } - if (curPeer.fromCache) { - // Handle an I/O error we got when using a cached peer. These are - // considered less serious because the underlying socket may be stale. - LOG.debug("Closed potentially stale domain peer {}", peer, ioe); - } else { - // Handle an I/O error we got when using a newly created domain peer. - // We temporarily disable the domain socket path for a few minutes in - // this case, to prevent wasting more time on it. - LOG.warn("I/O error constructing remote block reader. Disabling " + - "domain socket " + peer.getDomainSocket(), ioe); - clientContext.getDomainSocketFactory() - .disableDomainSocketPath(pathInfo.getPath()); - return null; - } - } finally { - if (blockReader == null) { - IOUtilsClient.cleanup(LOG, peer); - } - } - } - return null; - } - - /** - * Get a RemoteBlockReader that communicates over a TCP socket. - * - * @return The new BlockReader. We will not return null, but instead throw - * an exception if this fails. - * - * @throws InvalidToken - * If the block token was invalid. - * InvalidEncryptionKeyException - * If the encryption key was invalid. - * Other IOException - * If there was another problem. - */ - private BlockReader getRemoteBlockReaderFromTcp() throws IOException { - LOG.trace("{}: trying to create a remote block reader from a TCP socket", - this); - BlockReader blockReader = null; - while (true) { - BlockReaderPeer curPeer = null; - Peer peer = null; - try { - curPeer = nextTcpPeer(); - if (curPeer.fromCache) remainingCacheTries--; - peer = curPeer.peer; - blockReader = getRemoteBlockReader(peer); - return blockReader; - } catch (IOException ioe) { - if (isSecurityException(ioe)) { - LOG.trace("{}: got security exception while constructing a remote " - + "block reader from {}", this, peer, ioe); - throw ioe; - } - if ((curPeer != null) && curPeer.fromCache) { - // Handle an I/O error we got when using a cached peer. These are - // considered less serious, because the underlying socket may be - // stale. - LOG.debug("Closed potentially stale remote peer {}", peer, ioe); - } else { - // Handle an I/O error we got when using a newly created peer. - LOG.warn("I/O error constructing remote block reader.", ioe); - throw ioe; - } - } finally { - if (blockReader == null) { - IOUtilsClient.cleanup(LOG, peer); - } - } - } - } - - public static class BlockReaderPeer { - final Peer peer; - final boolean fromCache; - - BlockReaderPeer(Peer peer, boolean fromCache) { - this.peer = peer; - this.fromCache = fromCache; - } - } - - /** - * Get the next DomainPeer-- either from the cache or by creating it. - * - * @return the next DomainPeer, or null if we could not construct one. - */ - private BlockReaderPeer nextDomainPeer() { - if (remainingCacheTries > 0) { - Peer peer = clientContext.getPeerCache().get(datanode, true); - if (peer != null) { - LOG.trace("nextDomainPeer: reusing existing peer {}", peer); - return new BlockReaderPeer(peer, true); - } - } - DomainSocket sock = clientContext.getDomainSocketFactory(). - createSocket(pathInfo, conf.getSocketTimeout()); - if (sock == null) return null; - return new BlockReaderPeer(new DomainPeer(sock), false); - } - - /** - * Get the next TCP-based peer-- either from the cache or by creating it. - * - * @return the next Peer, or null if we could not construct one. - * - * @throws IOException If there was an error while constructing the peer - * (such as an InvalidEncryptionKeyException) - */ - private BlockReaderPeer nextTcpPeer() throws IOException { - if (remainingCacheTries > 0) { - Peer peer = clientContext.getPeerCache().get(datanode, false); - if (peer != null) { - LOG.trace("nextTcpPeer: reusing existing peer {}", peer); - return new BlockReaderPeer(peer, true); - } - } - try { - Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token, - datanode); - LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer); - return new BlockReaderPeer(peer, false); - } catch (IOException e) { - LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to" - + "{}", datanode); - throw e; - } - } - - /** - * Determine if an exception is security-related. - * - * We need to handle these exceptions differently than other IOExceptions. - * They don't indicate a communication problem. Instead, they mean that there - * is some action the client needs to take, such as refetching block tokens, - * renewing encryption keys, etc. - * - * @param ioe The exception - * @return True only if the exception is security-related. - */ - private static boolean isSecurityException(IOException ioe) { - return (ioe instanceof InvalidToken) || - (ioe instanceof InvalidEncryptionKeyException) || - (ioe instanceof InvalidBlockTokenException) || - (ioe instanceof AccessControlException); - } - - @SuppressWarnings("deprecation") - private BlockReader getRemoteBlockReader(Peer peer) throws IOException { - int networkDistance = clientContext.getNetworkDistance(datanode); - if (conf.getShortCircuitConf().isUseLegacyBlockReader()) { - return RemoteBlockReader.newBlockReader(fileName, - block, token, startOffset, length, conf.getIoBufferSize(), - verifyChecksum, clientName, peer, datanode, - clientContext.getPeerCache(), cachingStrategy, tracer, - networkDistance); - } else { - return RemoteBlockReader2.newBlockReader( - fileName, block, token, startOffset, length, - verifyChecksum, clientName, peer, datanode, - clientContext.getPeerCache(), cachingStrategy, tracer, - networkDistance); - } - } - - @Override - public String toString() { - return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")"; - } - - /** - * File name to print when accessing a block directly (from servlets) - * @param s Address of the block location - * @param poolId Block pool ID of the block - * @param blockId Block ID of the block - * @return string that has a file name for debug purposes - */ - public static String getFileName(final InetSocketAddress s, - final String poolId, final long blockId) { - return s.toString() + ":" + poolId + ":" + blockId; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java deleted file mode 100644 index 262b341..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ /dev/null @@ -1,719 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.EnumSet; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; -import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.DirectBufferPool; -import org.apache.htrace.core.TraceScope; -import org.apache.htrace.core.Tracer; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * BlockReaderLocal enables local short circuited reads. If the DFS client is on - * the same machine as the datanode, then the client can read files directly - * from the local file system rather than going through the datanode for better - * performance. <br> - * {@link BlockReaderLocal} works as follows: - * <ul> - * <li>The client performing short circuit reads must be configured at the - * datanode.</li> - * <li>The client gets the file descriptors for the metadata file and the data - * file for the block using - * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}. - * </li> - * <li>The client reads the file descriptors.</li> - * </ul> - */ [email protected] -class BlockReaderLocal implements BlockReader { - static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class); - - private static final DirectBufferPool bufferPool = new DirectBufferPool(); - - public static class Builder { - private final int bufferSize; - private boolean verifyChecksum; - private int maxReadahead; - private String filename; - private ShortCircuitReplica replica; - private long dataPos; - private ExtendedBlock block; - private StorageType storageType; - private Tracer tracer; - - public Builder(ShortCircuitConf conf) { - this.maxReadahead = Integer.MAX_VALUE; - this.verifyChecksum = !conf.isSkipShortCircuitChecksums(); - this.bufferSize = conf.getShortCircuitBufferSize(); - } - - public Builder setVerifyChecksum(boolean verifyChecksum) { - this.verifyChecksum = verifyChecksum; - return this; - } - - public Builder setCachingStrategy(CachingStrategy cachingStrategy) { - long readahead = cachingStrategy.getReadahead() != null ? - cachingStrategy.getReadahead() : - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; - this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead); - return this; - } - - public Builder setFilename(String filename) { - this.filename = filename; - return this; - } - - public Builder setShortCircuitReplica(ShortCircuitReplica replica) { - this.replica = replica; - return this; - } - - public Builder setStartOffset(long startOffset) { - this.dataPos = Math.max(0, startOffset); - return this; - } - - public Builder setBlock(ExtendedBlock block) { - this.block = block; - return this; - } - - public Builder setStorageType(StorageType storageType) { - this.storageType = storageType; - return this; - } - - public Builder setTracer(Tracer tracer) { - this.tracer = tracer; - return this; - } - - public BlockReaderLocal build() { - Preconditions.checkNotNull(replica); - return new BlockReaderLocal(this); - } - } - - private boolean closed = false; - - /** - * Pair of streams for this block. - */ - private final ShortCircuitReplica replica; - - /** - * The data FileChannel. - */ - private final FileChannel dataIn; - - /** - * The next place we'll read from in the block data FileChannel. - * - * If data is buffered in dataBuf, this offset will be larger than the - * offset of the next byte which a read() operation will give us. - */ - private long dataPos; - - /** - * The Checksum FileChannel. - */ - private final FileChannel checksumIn; - - /** - * Checksum type and size. - */ - private final DataChecksum checksum; - - /** - * If false, we will always skip the checksum. - */ - private final boolean verifyChecksum; - - /** - * Name of the block, for logging purposes. - */ - private final String filename; - - /** - * Block ID and Block Pool ID. - */ - private final ExtendedBlock block; - - /** - * Cache of Checksum#bytesPerChecksum. - */ - private final int bytesPerChecksum; - - /** - * Cache of Checksum#checksumSize. - */ - private final int checksumSize; - - /** - * Maximum number of chunks to allocate. - * - * This is used to allocate dataBuf and checksumBuf, in the event that - * we need them. - */ - private final int maxAllocatedChunks; - - /** - * True if zero readahead was requested. - */ - private final boolean zeroReadaheadRequested; - - /** - * Maximum amount of readahead we'll do. This will always be at least the, - * size of a single chunk, even if {@link #zeroReadaheadRequested} is true. - * The reason is because we need to do a certain amount of buffering in order - * to do checksumming. - * - * This determines how many bytes we'll use out of dataBuf and checksumBuf. - * Why do we allocate buffers, and then (potentially) only use part of them? - * The rationale is that allocating a lot of buffers of different sizes would - * make it very difficult for the DirectBufferPool to re-use buffers. - */ - private final int maxReadaheadLength; - - /** - * Buffers data starting at the current dataPos and extending on - * for dataBuf.limit(). - * - * This may be null if we don't need it. - */ - private ByteBuffer dataBuf; - - /** - * Buffers checksums starting at the current checksumPos and extending on - * for checksumBuf.limit(). - * - * This may be null if we don't need it. - */ - private ByteBuffer checksumBuf; - - /** - * StorageType of replica on DataNode. - */ - private StorageType storageType; - - /** - * The Tracer to use. - */ - private final Tracer tracer; - - private BlockReaderLocal(Builder builder) { - this.replica = builder.replica; - this.dataIn = replica.getDataStream().getChannel(); - this.dataPos = builder.dataPos; - this.checksumIn = replica.getMetaStream().getChannel(); - BlockMetadataHeader header = builder.replica.getMetaHeader(); - this.checksum = header.getChecksum(); - this.verifyChecksum = builder.verifyChecksum && - (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); - this.filename = builder.filename; - this.block = builder.block; - this.bytesPerChecksum = checksum.getBytesPerChecksum(); - this.checksumSize = checksum.getChecksumSize(); - - this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 : - ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum); - // Calculate the effective maximum readahead. - // We can't do more readahead than there is space in the buffer. - int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 : - ((Math.min(builder.bufferSize, builder.maxReadahead) + - bytesPerChecksum - 1) / bytesPerChecksum); - if (maxReadaheadChunks == 0) { - this.zeroReadaheadRequested = true; - maxReadaheadChunks = 1; - } else { - this.zeroReadaheadRequested = false; - } - this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; - this.storageType = builder.storageType; - this.tracer = builder.tracer; - } - - private synchronized void createDataBufIfNeeded() { - if (dataBuf == null) { - dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum); - dataBuf.position(0); - dataBuf.limit(0); - } - } - - private synchronized void freeDataBufIfExists() { - if (dataBuf != null) { - // When disposing of a dataBuf, we have to move our stored file index - // backwards. - dataPos -= dataBuf.remaining(); - dataBuf.clear(); - bufferPool.returnBuffer(dataBuf); - dataBuf = null; - } - } - - private synchronized void createChecksumBufIfNeeded() { - if (checksumBuf == null) { - checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize); - checksumBuf.position(0); - checksumBuf.limit(0); - } - } - - private synchronized void freeChecksumBufIfExists() { - if (checksumBuf != null) { - checksumBuf.clear(); - bufferPool.returnBuffer(checksumBuf); - checksumBuf = null; - } - } - - private synchronized int drainDataBuf(ByteBuffer buf) { - if (dataBuf == null) return -1; - int oldLimit = dataBuf.limit(); - int nRead = Math.min(dataBuf.remaining(), buf.remaining()); - if (nRead == 0) { - return (dataBuf.remaining() == 0) ? -1 : 0; - } - try { - dataBuf.limit(dataBuf.position() + nRead); - buf.put(dataBuf); - } finally { - dataBuf.limit(oldLimit); - } - return nRead; - } - - /** - * Read from the block file into a buffer. - * - * This function overwrites checksumBuf. It will increment dataPos. - * - * @param buf The buffer to read into. May be dataBuf. - * The position and limit of this buffer should be set to - * multiples of the checksum size. - * @param canSkipChecksum True if we can skip checksumming. - * - * @return Total bytes read. 0 on EOF. - */ - private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum) - throws IOException { - try (TraceScope ignored = tracer.newScope( - "BlockReaderLocal#fillBuffer(" + block.getBlockId() + ")")) { - int total = 0; - long startDataPos = dataPos; - int startBufPos = buf.position(); - while (buf.hasRemaining()) { - int nRead = dataIn.read(buf, dataPos); - if (nRead < 0) { - break; - } - dataPos += nRead; - total += nRead; - } - if (canSkipChecksum) { - freeChecksumBufIfExists(); - return total; - } - if (total > 0) { - try { - buf.limit(buf.position()); - buf.position(startBufPos); - createChecksumBufIfNeeded(); - int checksumsNeeded = (total + bytesPerChecksum - 1) / - bytesPerChecksum; - checksumBuf.clear(); - checksumBuf.limit(checksumsNeeded * checksumSize); - long checksumPos = BlockMetadataHeader.getHeaderSize() - + ((startDataPos / bytesPerChecksum) * checksumSize); - while (checksumBuf.hasRemaining()) { - int nRead = checksumIn.read(checksumBuf, checksumPos); - if (nRead < 0) { - throw new IOException("Got unexpected checksum file EOF at " + - checksumPos + ", block file position " + startDataPos + - " for block " + block + " of file " + filename); - } - checksumPos += nRead; - } - checksumBuf.flip(); - - checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos); - } finally { - buf.position(buf.limit()); - } - } - return total; - } - } - - private boolean createNoChecksumContext() { - return !verifyChecksum || - // Checksums are not stored for replicas on transient storage. We do - // not anchor, because we do not intend for client activity to block - // eviction from transient storage on the DataNode side. - (storageType != null && storageType.isTransient()) || - replica.addNoChecksumAnchor(); - } - - private void releaseNoChecksumContext() { - if (verifyChecksum) { - if (storageType == null || !storageType.isTransient()) { - replica.removeNoChecksumAnchor(); - } - } - } - - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - boolean canSkipChecksum = createNoChecksumContext(); - try { - String traceFormatStr = "read(buf.remaining={}, block={}, filename={}, " - + "canSkipChecksum={})"; - LOG.trace(traceFormatStr + ": starting", - buf.remaining(), block, filename, canSkipChecksum); - int nRead; - try { - if (canSkipChecksum && zeroReadaheadRequested) { - nRead = readWithoutBounceBuffer(buf); - } else { - nRead = readWithBounceBuffer(buf, canSkipChecksum); - } - } catch (IOException e) { - LOG.trace(traceFormatStr + ": I/O error", - buf.remaining(), block, filename, canSkipChecksum, e); - throw e; - } - LOG.trace(traceFormatStr + ": returning {}", - buf.remaining(), block, filename, canSkipChecksum, nRead); - return nRead; - } finally { - if (canSkipChecksum) releaseNoChecksumContext(); - } - } - - private synchronized int readWithoutBounceBuffer(ByteBuffer buf) - throws IOException { - freeDataBufIfExists(); - freeChecksumBufIfExists(); - int total = 0; - while (buf.hasRemaining()) { - int nRead = dataIn.read(buf, dataPos); - if (nRead <= 0) break; - dataPos += nRead; - total += nRead; - } - return (total == 0 && (dataPos == dataIn.size())) ? -1 : total; - } - - /** - * Fill the data buffer. If necessary, validate the data against the - * checksums. - * - * We always want the offsets of the data contained in dataBuf to be - * aligned to the chunk boundary. If we are validating checksums, we - * accomplish this by seeking backwards in the file until we're on a - * chunk boundary. (This is necessary because we can't checksum a - * partial chunk.) If we are not validating checksums, we simply only - * fill the latter part of dataBuf. - * - * @param canSkipChecksum true if we can skip checksumming. - * @return true if we hit EOF. - * @throws IOException - */ - private synchronized boolean fillDataBuf(boolean canSkipChecksum) - throws IOException { - createDataBufIfNeeded(); - final int slop = (int)(dataPos % bytesPerChecksum); - final long oldDataPos = dataPos; - dataBuf.limit(maxReadaheadLength); - if (canSkipChecksum) { - dataBuf.position(slop); - fillBuffer(dataBuf, true); - } else { - dataPos -= slop; - dataBuf.position(0); - fillBuffer(dataBuf, false); - } - dataBuf.limit(dataBuf.position()); - dataBuf.position(Math.min(dataBuf.position(), slop)); - LOG.trace("loaded {} bytes into bounce buffer from offset {} of {}", - dataBuf.remaining(), oldDataPos, block); - return dataBuf.limit() != maxReadaheadLength; - } - - /** - * Read using the bounce buffer. - * - * A 'direct' read actually has three phases. The first drains any - * remaining bytes from the slow read buffer. After this the read is - * guaranteed to be on a checksum chunk boundary. If there are still bytes - * to read, the fast direct path is used for as many remaining bytes as - * possible, up to a multiple of the checksum chunk size. Finally, any - * 'odd' bytes remaining at the end of the read cause another slow read to - * be issued, which involves an extra copy. - * - * Every 'slow' read tries to fill the slow read buffer in one go for - * efficiency's sake. As described above, all non-checksum-chunk-aligned - * reads will be served from the slower read path. - * - * @param buf The buffer to read into. - * @param canSkipChecksum True if we can skip checksums. - */ - private synchronized int readWithBounceBuffer(ByteBuffer buf, - boolean canSkipChecksum) throws IOException { - int total = 0; - int bb = drainDataBuf(buf); // drain bounce buffer if possible - if (bb >= 0) { - total += bb; - if (buf.remaining() == 0) return total; - } - boolean eof = true, done = false; - do { - if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength) - && ((dataPos % bytesPerChecksum) == 0)) { - // Fast lane: try to read directly into user-supplied buffer, bypassing - // bounce buffer. - int oldLimit = buf.limit(); - int nRead; - try { - buf.limit(buf.position() + maxReadaheadLength); - nRead = fillBuffer(buf, canSkipChecksum); - } finally { - buf.limit(oldLimit); - } - if (nRead < maxReadaheadLength) { - done = true; - } - if (nRead > 0) { - eof = false; - } - total += nRead; - } else { - // Slow lane: refill bounce buffer. - if (fillDataBuf(canSkipChecksum)) { - done = true; - } - bb = drainDataBuf(buf); // drain bounce buffer if possible - if (bb >= 0) { - eof = false; - total += bb; - } - } - } while ((!done) && (buf.remaining() > 0)); - return (eof && total == 0) ? -1 : total; - } - - @Override - public synchronized int read(byte[] arr, int off, int len) - throws IOException { - boolean canSkipChecksum = createNoChecksumContext(); - int nRead; - try { - final String traceFormatStr = "read(arr.length={}, off={}, len={}, " - + "filename={}, block={}, canSkipChecksum={})"; - LOG.trace(traceFormatStr + ": starting", - arr.length, off, len, filename, block, canSkipChecksum); - try { - if (canSkipChecksum && zeroReadaheadRequested) { - nRead = readWithoutBounceBuffer(arr, off, len); - } else { - nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum); - } - } catch (IOException e) { - LOG.trace(traceFormatStr + ": I/O error", - arr.length, off, len, filename, block, canSkipChecksum, e); - throw e; - } - LOG.trace(traceFormatStr + ": returning {}", - arr.length, off, len, filename, block, canSkipChecksum, nRead); - } finally { - if (canSkipChecksum) releaseNoChecksumContext(); - } - return nRead; - } - - private synchronized int readWithoutBounceBuffer(byte arr[], int off, - int len) throws IOException { - freeDataBufIfExists(); - freeChecksumBufIfExists(); - int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos); - if (nRead > 0) { - dataPos += nRead; - } else if ((nRead == 0) && (dataPos == dataIn.size())) { - return -1; - } - return nRead; - } - - private synchronized int readWithBounceBuffer(byte arr[], int off, int len, - boolean canSkipChecksum) throws IOException { - createDataBufIfNeeded(); - if (!dataBuf.hasRemaining()) { - dataBuf.position(0); - dataBuf.limit(maxReadaheadLength); - fillDataBuf(canSkipChecksum); - } - if (dataBuf.remaining() == 0) return -1; - int toRead = Math.min(dataBuf.remaining(), len); - dataBuf.get(arr, off, toRead); - return toRead; - } - - @Override - public synchronized long skip(long n) throws IOException { - int discardedFromBuf = 0; - long remaining = n; - if ((dataBuf != null) && dataBuf.hasRemaining()) { - discardedFromBuf = (int)Math.min(dataBuf.remaining(), n); - dataBuf.position(dataBuf.position() + discardedFromBuf); - remaining -= discardedFromBuf; - } - LOG.trace("skip(n={}, block={}, filename={}): discarded {} bytes from " - + "dataBuf and advanced dataPos by {}", - n, block, filename, discardedFromBuf, remaining); - dataPos += remaining; - return n; - } - - @Override - public int available() { - // We never do network I/O in BlockReaderLocal. - return Integer.MAX_VALUE; - } - - @Override - public synchronized void close() throws IOException { - if (closed) return; - closed = true; - LOG.trace("close(filename={}, block={})", filename, block); - replica.unref(); - freeDataBufIfExists(); - freeChecksumBufIfExists(); - } - - @Override - public synchronized void readFully(byte[] arr, int off, int len) - throws IOException { - BlockReaderUtil.readFully(this, arr, off, len); - } - - @Override - public synchronized int readAll(byte[] buf, int off, int len) - throws IOException { - return BlockReaderUtil.readAll(this, buf, off, len); - } - - @Override - public boolean isShortCircuit() { - return true; - } - - /** - * Get or create a memory map for this replica. - * - * There are two kinds of ClientMmap objects we could fetch here: one that - * will always read pre-checksummed data, and one that may read data that - * hasn't been checksummed. - * - * If we fetch the former, "safe" kind of ClientMmap, we have to increment - * the anchor count on the shared memory slot. This will tell the DataNode - * not to munlock the block until this ClientMmap is closed. - * If we fetch the latter, we don't bother with anchoring. - * - * @param opts The options to use, such as SKIP_CHECKSUMS. - * - * @return null on failure; the ClientMmap otherwise. - */ - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - boolean anchor = verifyChecksum && - !opts.contains(ReadOption.SKIP_CHECKSUMS); - if (anchor) { - if (!createNoChecksumContext()) { - LOG.trace("can't get an mmap for {} of {} since SKIP_CHECKSUMS was not " - + "given, we aren't skipping checksums, and the block is not " - + "mlocked.", block, filename); - return null; - } - } - ClientMmap clientMmap = null; - try { - clientMmap = replica.getOrCreateClientMmap(anchor); - } finally { - if ((clientMmap == null) && anchor) { - releaseNoChecksumContext(); - } - } - return clientMmap; - } - - @VisibleForTesting - boolean getVerifyChecksum() { - return this.verifyChecksum; - } - - @VisibleForTesting - int getMaxReadaheadLength() { - return this.maxReadaheadLength; - } - - /** - * Make the replica anchorable. Normally this can only be done by the - * DataNode. This method is only for testing. - */ - @VisibleForTesting - void forceAnchorable() { - replica.getSlot().makeAnchorable(); - } - - /** - * Make the replica unanchorable. Normally this can only be done by the - * DataNode. This method is only for testing. - */ - @VisibleForTesting - void forceUnanchorable() { - replica.getSlot().makeUnanchorable(); - } - - @Override - public int getNetworkDistance() { - return 0; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
