http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java deleted file mode 100644 index c9add53..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ /dev/null @@ -1,892 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; - -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.net.InetSocketAddress; -import java.util.List; - -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; -import org.apache.commons.lang.mutable.MutableBoolean; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; -import org.apache.hadoop.hdfs.net.DomainPeer; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.PerformanceAdvisory; -import org.apache.hadoop.util.Time; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - - -/** - * Utility class to create BlockReader implementations. - */ [email protected] -public class BlockReaderFactory implements ShortCircuitReplicaCreator { - static final Log LOG = LogFactory.getLog(BlockReaderFactory.class); - - public static class FailureInjector { - public void injectRequestFileDescriptorsFailure() throws IOException { - // do nothing - } - public boolean getSupportsReceiptVerification() { - return true; - } - } - - @VisibleForTesting - static ShortCircuitReplicaCreator - createShortCircuitReplicaInfoCallback = null; - - private final DfsClientConf conf; - - /** - * Injects failures into specific operations during unit tests. - */ - private static FailureInjector failureInjector = new FailureInjector(); - - /** - * The file name, for logging and debugging purposes. - */ - private String fileName; - - /** - * The block ID and block pool ID to use. - */ - private ExtendedBlock block; - - /** - * The block token to use for security purposes. - */ - private Token<BlockTokenIdentifier> token; - - /** - * The offset within the block to start reading at. - */ - private long startOffset; - - /** - * If false, we won't try to verify the block checksum. - */ - private boolean verifyChecksum; - - /** - * The name of this client. - */ - private String clientName; - - /** - * The DataNode we're talking to. - */ - private DatanodeInfo datanode; - - /** - * StorageType of replica on DataNode. - */ - private StorageType storageType; - - /** - * If false, we won't try short-circuit local reads. - */ - private boolean allowShortCircuitLocalReads; - - /** - * The ClientContext to use for things like the PeerCache. - */ - private ClientContext clientContext; - - /** - * Number of bytes to read. -1 indicates no limit. - */ - private long length = -1; - - /** - * Caching strategy to use when reading the block. - */ - private CachingStrategy cachingStrategy; - - /** - * Socket address to use to connect to peer. - */ - private InetSocketAddress inetSocketAddress; - - /** - * Remote peer factory to use to create a peer, if needed. - */ - private RemotePeerFactory remotePeerFactory; - - /** - * UserGroupInformation to use for legacy block reader local objects, if needed. - */ - private UserGroupInformation userGroupInformation; - - /** - * Configuration to use for legacy block reader local objects, if needed. - */ - private Configuration configuration; - - /** - * Information about the domain socket path we should use to connect to the - * local peer-- or null if we haven't examined the local domain socket. - */ - private DomainSocketFactory.PathInfo pathInfo; - - /** - * The remaining number of times that we'll try to pull a socket out of the - * cache. - */ - private int remainingCacheTries; - - public BlockReaderFactory(DfsClientConf conf) { - this.conf = conf; - this.remainingCacheTries = conf.getNumCachedConnRetry(); - } - - public BlockReaderFactory setFileName(String fileName) { - this.fileName = fileName; - return this; - } - - public BlockReaderFactory setBlock(ExtendedBlock block) { - this.block = block; - return this; - } - - public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) { - this.token = token; - return this; - } - - public BlockReaderFactory setStartOffset(long startOffset) { - this.startOffset = startOffset; - return this; - } - - public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) { - this.verifyChecksum = verifyChecksum; - return this; - } - - public BlockReaderFactory setClientName(String clientName) { - this.clientName = clientName; - return this; - } - - public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) { - this.datanode = datanode; - return this; - } - - public BlockReaderFactory setStorageType(StorageType storageType) { - this.storageType = storageType; - return this; - } - - public BlockReaderFactory setAllowShortCircuitLocalReads( - boolean allowShortCircuitLocalReads) { - this.allowShortCircuitLocalReads = allowShortCircuitLocalReads; - return this; - } - - public BlockReaderFactory setClientCacheContext( - ClientContext clientContext) { - this.clientContext = clientContext; - return this; - } - - public BlockReaderFactory setLength(long length) { - this.length = length; - return this; - } - - public BlockReaderFactory setCachingStrategy( - CachingStrategy cachingStrategy) { - this.cachingStrategy = cachingStrategy; - return this; - } - - public BlockReaderFactory setInetSocketAddress ( - InetSocketAddress inetSocketAddress) { - this.inetSocketAddress = inetSocketAddress; - return this; - } - - public BlockReaderFactory setUserGroupInformation( - UserGroupInformation userGroupInformation) { - this.userGroupInformation = userGroupInformation; - return this; - } - - public BlockReaderFactory setRemotePeerFactory( - RemotePeerFactory remotePeerFactory) { - this.remotePeerFactory = remotePeerFactory; - return this; - } - - public BlockReaderFactory setConfiguration( - Configuration configuration) { - this.configuration = configuration; - return this; - } - - @VisibleForTesting - public static void setFailureInjectorForTesting(FailureInjector injector) { - failureInjector = injector; - } - - /** - * Build a BlockReader with the given options. - * - * This function will do the best it can to create a block reader that meets - * all of our requirements. We prefer short-circuit block readers - * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the - * former avoid the overhead of socket communication. If short-circuit is - * unavailable, our next fallback is data transfer over UNIX domain sockets, - * if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't - * work, we will try to create a remote block reader that operates over TCP - * sockets. - * - * There are a few caches that are important here. - * - * The ShortCircuitCache stores file descriptor objects which have been passed - * from the DataNode. - * - * The DomainSocketFactory stores information about UNIX domain socket paths - * that we not been able to use in the past, so that we don't waste time - * retrying them over and over. (Like all the caches, it does have a timeout, - * though.) - * - * The PeerCache stores peers that we have used in the past. If we can reuse - * one of these peers, we avoid the overhead of re-opening a socket. However, - * if the socket has been timed out on the remote end, our attempt to reuse - * the socket may end with an IOException. For that reason, we limit our - * attempts at socket reuse to dfs.client.cached.conn.retry times. After - * that, we create new sockets. This avoids the problem where a thread tries - * to talk to a peer that it hasn't talked to in a while, and has to clean out - * every entry in a socket cache full of stale entries. - * - * @return The new BlockReader. We will not return null. - * - * @throws InvalidToken - * If the block token was invalid. - * InvalidEncryptionKeyException - * If the encryption key was invalid. - * Other IOException - * If there was another problem. - */ - public BlockReader build() throws IOException { - Preconditions.checkNotNull(configuration); - BlockReader reader = tryToCreateExternalBlockReader(); - if (reader != null) { - return reader; - } - final ShortCircuitConf scConf = conf.getShortCircuitConf(); - if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) { - if (clientContext.getUseLegacyBlockReaderLocal()) { - reader = getLegacyBlockReaderLocal(); - if (reader != null) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": returning new legacy block reader local."); - } - return reader; - } - } else { - reader = getBlockReaderLocal(); - if (reader != null) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": returning new block reader local."); - } - return reader; - } - } - } - if (scConf.isDomainSocketDataTraffic()) { - reader = getRemoteBlockReaderFromDomain(); - if (reader != null) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": returning new remote block reader using " + - "UNIX domain socket on " + pathInfo.getPath()); - } - return reader; - } - } - Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting, - "TCP reads were disabled for testing, but we failed to " + - "do a non-TCP read."); - return getRemoteBlockReaderFromTcp(); - } - - private BlockReader tryToCreateExternalBlockReader() { - List<Class<? extends ReplicaAccessorBuilder>> clses = - conf.getReplicaAccessorBuilderClasses(); - for (Class<? extends ReplicaAccessorBuilder> cls : clses) { - try { - ByteArrayDataOutput bado = ByteStreams.newDataOutput(); - token.write(bado); - byte tokenBytes[] = bado.toByteArray(); - - Constructor<? extends ReplicaAccessorBuilder> ctor = - cls.getConstructor(); - ReplicaAccessorBuilder builder = ctor.newInstance(); - ReplicaAccessor accessor = builder. - setAllowShortCircuitReads(allowShortCircuitLocalReads). - setBlock(block.getBlockId(), block.getBlockPoolId()). - setGenerationStamp(block.getGenerationStamp()). - setBlockAccessToken(tokenBytes). - setClientName(clientName). - setConfiguration(configuration). - setFileName(fileName). - setVerifyChecksum(verifyChecksum). - setVisibleLength(length). - build(); - if (accessor == null) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": No ReplicaAccessor created by " + - cls.getName()); - } - } else { - return new ExternalBlockReader(accessor, length, startOffset); - } - } catch (Throwable t) { - LOG.warn("Failed to construct new object of type " + - cls.getName(), t); - } - } - return null; - } - - - /** - * Get {@link BlockReaderLocalLegacy} for short circuited local reads. - * This block reader implements the path-based style of local reads - * first introduced in HDFS-2246. - */ - private BlockReader getLegacyBlockReaderLocal() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": trying to construct BlockReaderLocalLegacy"); - } - if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " + - "the address " + inetSocketAddress + " is not local"); - } - return null; - } - if (clientContext.getDisableLegacyBlockReaderLocal()) { - PerformanceAdvisory.LOG.debug("{}: can't construct " + - "BlockReaderLocalLegacy because " + - "disableLegacyBlockReaderLocal is set.", this); - return null; - } - IOException ioe; - try { - return BlockReaderLocalLegacy.newBlockReader(conf, - userGroupInformation, configuration, fileName, block, token, - datanode, startOffset, length, storageType); - } catch (RemoteException remoteException) { - ioe = remoteException.unwrapRemoteException( - InvalidToken.class, AccessControlException.class); - } catch (IOException e) { - ioe = e; - } - if ((!(ioe instanceof AccessControlException)) && - isSecurityException(ioe)) { - // Handle security exceptions. - // We do not handle AccessControlException here, since - // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate - // that the user is not in dfs.block.local-path-access.user, a condition - // which requires us to disable legacy SCR. - throw ioe; - } - LOG.warn(this + ": error creating legacy BlockReaderLocal. " + - "Disabling legacy local reads.", ioe); - clientContext.setDisableLegacyBlockReaderLocal(); - return null; - } - - private BlockReader getBlockReaderLocal() throws InvalidToken { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": trying to construct a BlockReaderLocal " + - "for short-circuit reads."); - } - if (pathInfo == null) { - pathInfo = clientContext.getDomainSocketFactory() - .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); - } - if (!pathInfo.getPathState().getUsableForShortCircuit()) { - PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " + - "giving up on BlockReaderLocal.", this, pathInfo); - return null; - } - ShortCircuitCache cache = clientContext.getShortCircuitCache(); - ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); - ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); - InvalidToken exc = info.getInvalidTokenException(); - if (exc != null) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": got InvalidToken exception while trying to " + - "construct BlockReaderLocal via " + pathInfo.getPath()); - } - throw exc; - } - if (info.getReplica() == null) { - PerformanceAdvisory.LOG.debug("{}: failed to get " + - "ShortCircuitReplica. Cannot construct " + - "BlockReaderLocal via {}", this, pathInfo.getPath()); - return null; - } - return new BlockReaderLocal.Builder(conf.getShortCircuitConf()). - setFilename(fileName). - setBlock(block). - setStartOffset(startOffset). - setShortCircuitReplica(info.getReplica()). - setVerifyChecksum(verifyChecksum). - setCachingStrategy(cachingStrategy). - setStorageType(storageType). - build(); - } - - /** - * Fetch a pair of short-circuit block descriptors from a local DataNode. - * - * @return Null if we could not communicate with the datanode, - * a new ShortCircuitReplicaInfo object otherwise. - * ShortCircuitReplicaInfo objects may contain either an InvalidToken - * exception, or a ShortCircuitReplica object ready to use. - */ - @Override - public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { - if (createShortCircuitReplicaInfoCallback != null) { - ShortCircuitReplicaInfo info = - createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo(); - if (info != null) return info; - } - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": trying to create ShortCircuitReplicaInfo."); - } - BlockReaderPeer curPeer; - while (true) { - curPeer = nextDomainPeer(); - if (curPeer == null) break; - if (curPeer.fromCache) remainingCacheTries--; - DomainPeer peer = (DomainPeer)curPeer.peer; - Slot slot = null; - ShortCircuitCache cache = clientContext.getShortCircuitCache(); - try { - MutableBoolean usedPeer = new MutableBoolean(false); - slot = cache.allocShmSlot(datanode, peer, usedPeer, - new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()), - clientName); - if (usedPeer.booleanValue()) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": allocShmSlot used up our previous socket " + - peer.getDomainSocket() + ". Allocating a new one..."); - } - curPeer = nextDomainPeer(); - if (curPeer == null) break; - peer = (DomainPeer)curPeer.peer; - } - ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot); - clientContext.getPeerCache().put(datanode, peer); - return info; - } catch (IOException e) { - if (slot != null) { - cache.freeSlot(slot); - } - if (curPeer.fromCache) { - // Handle an I/O error we got when using a cached socket. - // These are considered less serious, because the socket may be stale. - if (LOG.isDebugEnabled()) { - LOG.debug(this + ": closing stale domain peer " + peer, e); - } - IOUtils.cleanup(LOG, peer); - } else { - // Handle an I/O error we got when using a newly created socket. - // We temporarily disable the domain socket path for a few minutes in - // this case, to prevent wasting more time on it. - LOG.warn(this + ": I/O error requesting file descriptors. " + - "Disabling domain socket " + peer.getDomainSocket(), e); - IOUtils.cleanup(LOG, peer); - clientContext.getDomainSocketFactory() - .disableDomainSocketPath(pathInfo.getPath()); - return null; - } - } - } - return null; - } - - /** - * Request file descriptors from a DomainPeer. - * - * @param peer The peer to use for communication. - * @param slot If non-null, the shared memory slot to associate with the - * new ShortCircuitReplica. - * - * @return A ShortCircuitReplica object if we could communicate with the - * datanode; null, otherwise. - * @throws IOException If we encountered an I/O exception while communicating - * with the datanode. - */ - private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, - Slot slot) throws IOException { - ShortCircuitCache cache = clientContext.getShortCircuitCache(); - final DataOutputStream out = - new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); - SlotId slotId = slot == null ? null : slot.getSlotId(); - new Sender(out).requestShortCircuitFds(block, token, slotId, 1, - failureInjector.getSupportsReceiptVerification()); - DataInputStream in = new DataInputStream(peer.getInputStream()); - BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); - DomainSocket sock = peer.getDomainSocket(); - failureInjector.injectRequestFileDescriptorsFailure(); - switch (resp.getStatus()) { - case SUCCESS: - byte buf[] = new byte[1]; - FileInputStream fis[] = new FileInputStream[2]; - sock.recvFileInputStreams(fis, buf, 0, buf.length); - ShortCircuitReplica replica = null; - try { - ExtendedBlockId key = - new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); - if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { - LOG.trace("Sending receipt verification byte for slot " + slot); - sock.getOutputStream().write(0); - } - replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, - Time.monotonicNow(), slot); - return new ShortCircuitReplicaInfo(replica); - } catch (IOException e) { - // This indicates an error reading from disk, or a format error. Since - // it's not a socket communication problem, we return null rather than - // throwing an exception. - LOG.warn(this + ": error creating ShortCircuitReplica.", e); - return null; - } finally { - if (replica == null) { - IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]); - } - } - case ERROR_UNSUPPORTED: - if (!resp.hasShortCircuitAccessVersion()) { - LOG.warn("short-circuit read access is disabled for " + - "DataNode " + datanode + ". reason: " + resp.getMessage()); - clientContext.getDomainSocketFactory() - .disableShortCircuitForPath(pathInfo.getPath()); - } else { - LOG.warn("short-circuit read access for the file " + - fileName + " is disabled for DataNode " + datanode + - ". reason: " + resp.getMessage()); - } - return null; - case ERROR_ACCESS_TOKEN: - String msg = "access control error while " + - "attempting to set up short-circuit access to " + - fileName + resp.getMessage(); - if (LOG.isDebugEnabled()) { - LOG.debug(this + ":" + msg); - } - return new ShortCircuitReplicaInfo(new InvalidToken(msg)); - default: - LOG.warn(this + ": unknown response code " + resp.getStatus() + - " while attempting to set up short-circuit access. " + - resp.getMessage()); - clientContext.getDomainSocketFactory() - .disableShortCircuitForPath(pathInfo.getPath()); - return null; - } - } - - /** - * Get a RemoteBlockReader that communicates over a UNIX domain socket. - * - * @return The new BlockReader, or null if we failed to create the block - * reader. - * - * @throws InvalidToken If the block token was invalid. - * Potentially other security-related execptions. - */ - private BlockReader getRemoteBlockReaderFromDomain() throws IOException { - if (pathInfo == null) { - pathInfo = clientContext.getDomainSocketFactory() - .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); - } - if (!pathInfo.getPathState().getUsableForDataTransfer()) { - PerformanceAdvisory.LOG.debug("{}: not trying to create a " + - "remote block reader because the UNIX domain socket at {}" + - " is not usable.", this, pathInfo); - return null; - } - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": trying to create a remote block reader from the " + - "UNIX domain socket at " + pathInfo.getPath()); - } - - while (true) { - BlockReaderPeer curPeer = nextDomainPeer(); - if (curPeer == null) break; - if (curPeer.fromCache) remainingCacheTries--; - DomainPeer peer = (DomainPeer)curPeer.peer; - BlockReader blockReader = null; - try { - blockReader = getRemoteBlockReader(peer); - return blockReader; - } catch (IOException ioe) { - IOUtils.cleanup(LOG, peer); - if (isSecurityException(ioe)) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": got security exception while constructing " + - "a remote block reader from the unix domain socket at " + - pathInfo.getPath(), ioe); - } - throw ioe; - } - if (curPeer.fromCache) { - // Handle an I/O error we got when using a cached peer. These are - // considered less serious, because the underlying socket may be stale. - if (LOG.isDebugEnabled()) { - LOG.debug("Closed potentially stale domain peer " + peer, ioe); - } - } else { - // Handle an I/O error we got when using a newly created domain peer. - // We temporarily disable the domain socket path for a few minutes in - // this case, to prevent wasting more time on it. - LOG.warn("I/O error constructing remote block reader. Disabling " + - "domain socket " + peer.getDomainSocket(), ioe); - clientContext.getDomainSocketFactory() - .disableDomainSocketPath(pathInfo.getPath()); - return null; - } - } finally { - if (blockReader == null) { - IOUtils.cleanup(LOG, peer); - } - } - } - return null; - } - - /** - * Get a RemoteBlockReader that communicates over a TCP socket. - * - * @return The new BlockReader. We will not return null, but instead throw - * an exception if this fails. - * - * @throws InvalidToken - * If the block token was invalid. - * InvalidEncryptionKeyException - * If the encryption key was invalid. - * Other IOException - * If there was another problem. - */ - private BlockReader getRemoteBlockReaderFromTcp() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": trying to create a remote block reader from a " + - "TCP socket"); - } - BlockReader blockReader = null; - while (true) { - BlockReaderPeer curPeer = null; - Peer peer = null; - try { - curPeer = nextTcpPeer(); - if (curPeer.fromCache) remainingCacheTries--; - peer = curPeer.peer; - blockReader = getRemoteBlockReader(peer); - return blockReader; - } catch (IOException ioe) { - if (isSecurityException(ioe)) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": got security exception while constructing " + - "a remote block reader from " + peer, ioe); - } - throw ioe; - } - if ((curPeer != null) && curPeer.fromCache) { - // Handle an I/O error we got when using a cached peer. These are - // considered less serious, because the underlying socket may be - // stale. - if (LOG.isDebugEnabled()) { - LOG.debug("Closed potentially stale remote peer " + peer, ioe); - } - } else { - // Handle an I/O error we got when using a newly created peer. - LOG.warn("I/O error constructing remote block reader.", ioe); - throw ioe; - } - } finally { - if (blockReader == null) { - IOUtils.cleanup(LOG, peer); - } - } - } - } - - public static class BlockReaderPeer { - final Peer peer; - final boolean fromCache; - - BlockReaderPeer(Peer peer, boolean fromCache) { - this.peer = peer; - this.fromCache = fromCache; - } - } - - /** - * Get the next DomainPeer-- either from the cache or by creating it. - * - * @return the next DomainPeer, or null if we could not construct one. - */ - private BlockReaderPeer nextDomainPeer() { - if (remainingCacheTries > 0) { - Peer peer = clientContext.getPeerCache().get(datanode, true); - if (peer != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("nextDomainPeer: reusing existing peer " + peer); - } - return new BlockReaderPeer(peer, true); - } - } - DomainSocket sock = clientContext.getDomainSocketFactory(). - createSocket(pathInfo, conf.getSocketTimeout()); - if (sock == null) return null; - return new BlockReaderPeer(new DomainPeer(sock), false); - } - - /** - * Get the next TCP-based peer-- either from the cache or by creating it. - * - * @return the next Peer, or null if we could not construct one. - * - * @throws IOException If there was an error while constructing the peer - * (such as an InvalidEncryptionKeyException) - */ - private BlockReaderPeer nextTcpPeer() throws IOException { - if (remainingCacheTries > 0) { - Peer peer = clientContext.getPeerCache().get(datanode, false); - if (peer != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("nextTcpPeer: reusing existing peer " + peer); - } - return new BlockReaderPeer(peer, true); - } - } - try { - Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token, - datanode); - if (LOG.isTraceEnabled()) { - LOG.trace("nextTcpPeer: created newConnectedPeer " + peer); - } - return new BlockReaderPeer(peer, false); - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("nextTcpPeer: failed to create newConnectedPeer " + - "connected to " + datanode); - } - throw e; - } - } - - /** - * Determine if an exception is security-related. - * - * We need to handle these exceptions differently than other IOExceptions. - * They don't indicate a communication problem. Instead, they mean that there - * is some action the client needs to take, such as refetching block tokens, - * renewing encryption keys, etc. - * - * @param ioe The exception - * @return True only if the exception is security-related. - */ - private static boolean isSecurityException(IOException ioe) { - return (ioe instanceof InvalidToken) || - (ioe instanceof InvalidEncryptionKeyException) || - (ioe instanceof InvalidBlockTokenException) || - (ioe instanceof AccessControlException); - } - - @SuppressWarnings("deprecation") - private BlockReader getRemoteBlockReader(Peer peer) throws IOException { - if (conf.getShortCircuitConf().isUseLegacyBlockReader()) { - return RemoteBlockReader.newBlockReader(fileName, - block, token, startOffset, length, conf.getIoBufferSize(), - verifyChecksum, clientName, peer, datanode, - clientContext.getPeerCache(), cachingStrategy); - } else { - return RemoteBlockReader2.newBlockReader( - fileName, block, token, startOffset, length, - verifyChecksum, clientName, peer, datanode, - clientContext.getPeerCache(), cachingStrategy); - } - } - - @Override - public String toString() { - return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")"; - } - - /** - * File name to print when accessing a block directly (from servlets) - * @param s Address of the block location - * @param poolId Block pool ID of the block - * @param blockId Block ID of the block - * @return string that has a file name for debug purposes - */ - public static String getFileName(final InetSocketAddress s, - final String poolId, final long blockId) { - return s.toString() + ":" + poolId + ":" + blockId; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java deleted file mode 100644 index cac5366..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java +++ /dev/null @@ -1,369 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.BlockStorageLocation; -import org.apache.hadoop.fs.HdfsVolumeId; -import org.apache.hadoop.fs.VolumeId; -import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.token.Token; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - [email protected] [email protected] -class BlockStorageLocationUtil { - - static final Log LOG = LogFactory - .getLog(BlockStorageLocationUtil.class); - - /** - * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set - * of datanodes and blocks. The blocks must all correspond to the same - * block pool. - * - * @param datanodeBlocks - * Map of datanodes to block replicas at each datanode - * @return callables Used to query each datanode for location information on - * the block replicas at the datanode - */ - private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables( - Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, - int timeout, boolean connectToDnViaHostname, Span parent) { - - if (datanodeBlocks.isEmpty()) { - return Lists.newArrayList(); - } - - // Construct the callables, one per datanode - List<VolumeBlockLocationCallable> callables = - new ArrayList<VolumeBlockLocationCallable>(); - for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks - .entrySet()) { - // Construct RPC parameters - DatanodeInfo datanode = entry.getKey(); - List<LocatedBlock> locatedBlocks = entry.getValue(); - if (locatedBlocks.isEmpty()) { - continue; - } - - // Ensure that the blocks all are from the same block pool. - String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId(); - for (LocatedBlock lb : locatedBlocks) { - if (!poolId.equals(lb.getBlock().getBlockPoolId())) { - throw new IllegalArgumentException( - "All blocks to be queried must be in the same block pool: " + - locatedBlocks.get(0).getBlock() + " and " + lb + - " are from different pools."); - } - } - - long[] blockIds = new long[locatedBlocks.size()]; - int i = 0; - List<Token<BlockTokenIdentifier>> dnTokens = - new ArrayList<Token<BlockTokenIdentifier>>( - locatedBlocks.size()); - for (LocatedBlock b : locatedBlocks) { - blockIds[i++] = b.getBlock().getBlockId(); - dnTokens.add(b.getBlockToken()); - } - VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable( - conf, datanode, poolId, blockIds, dnTokens, timeout, - connectToDnViaHostname, parent); - callables.add(callable); - } - return callables; - } - - /** - * Queries datanodes for the blocks specified in <code>datanodeBlocks</code>, - * making one RPC to each datanode. These RPCs are made in parallel using a - * threadpool. - * - * @param datanodeBlocks - * Map of datanodes to the blocks present on the DN - * @return metadatas Map of datanodes to block metadata of the DN - * @throws InvalidBlockTokenException - * if client does not have read access on a requested block - */ - static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata( - Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, - int poolsize, int timeoutMs, boolean connectToDnViaHostname) - throws InvalidBlockTokenException { - - List<VolumeBlockLocationCallable> callables = - createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs, - connectToDnViaHostname, Trace.currentSpan()); - - // Use a thread pool to execute the Callables in parallel - List<Future<HdfsBlocksMetadata>> futures = - new ArrayList<Future<HdfsBlocksMetadata>>(); - ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize); - try { - futures = executor.invokeAll(callables, timeoutMs, - TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // Swallow the exception here, because we can return partial results - } - executor.shutdown(); - - Map<DatanodeInfo, HdfsBlocksMetadata> metadatas = - Maps.newHashMapWithExpectedSize(datanodeBlocks.size()); - // Fill in metadatas with results from DN RPCs, where possible - for (int i = 0; i < futures.size(); i++) { - VolumeBlockLocationCallable callable = callables.get(i); - DatanodeInfo datanode = callable.getDatanodeInfo(); - Future<HdfsBlocksMetadata> future = futures.get(i); - try { - HdfsBlocksMetadata metadata = future.get(); - metadatas.put(callable.getDatanodeInfo(), metadata); - } catch (CancellationException e) { - LOG.info("Cancelled while waiting for datanode " - + datanode.getIpcAddr(false) + ": " + e.toString()); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof InvalidBlockTokenException) { - LOG.warn("Invalid access token when trying to retrieve " - + "information from datanode " + datanode.getIpcAddr(false)); - throw (InvalidBlockTokenException) t; - } - else if (t instanceof UnsupportedOperationException) { - LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support" - + " required #getHdfsBlocksMetadata() API"); - throw (UnsupportedOperationException) t; - } else { - LOG.info("Failed to query block locations on datanode " + - datanode.getIpcAddr(false) + ": " + t); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Could not fetch information from datanode", t); - } - } catch (InterruptedException e) { - // Shouldn't happen, because invokeAll waits for all Futures to be ready - LOG.info("Interrupted while fetching HdfsBlocksMetadata"); - } - } - - return metadatas; - } - - /** - * Group the per-replica {@link VolumeId} info returned from - * {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be - * associated - * with the corresponding {@link LocatedBlock}. - * - * @param blocks - * Original LocatedBlock array - * @param metadatas - * VolumeId information for the replicas on each datanode - * @return blockVolumeIds per-replica VolumeId information associated with the - * parent LocatedBlock - */ - static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks( - List<LocatedBlock> blocks, - Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) { - - // Initialize mapping of ExtendedBlock to LocatedBlock. - // Used to associate results from DN RPCs to the parent LocatedBlock - Map<Long, LocatedBlock> blockIdToLocBlock = - new HashMap<Long, LocatedBlock>(); - for (LocatedBlock b : blocks) { - blockIdToLocBlock.put(b.getBlock().getBlockId(), b); - } - - // Initialize the mapping of blocks -> list of VolumeIds, one per replica - // This is filled out with real values from the DN RPCs - Map<LocatedBlock, List<VolumeId>> blockVolumeIds = - new HashMap<LocatedBlock, List<VolumeId>>(); - for (LocatedBlock b : blocks) { - ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length); - for (int i = 0; i < b.getLocations().length; i++) { - l.add(null); - } - blockVolumeIds.put(b, l); - } - - // Iterate through the list of metadatas (one per datanode). - // For each metadata, if it's valid, insert its volume location information - // into the Map returned to the caller - for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) { - DatanodeInfo datanode = entry.getKey(); - HdfsBlocksMetadata metadata = entry.getValue(); - // Check if metadata is valid - if (metadata == null) { - continue; - } - long[] metaBlockIds = metadata.getBlockIds(); - List<byte[]> metaVolumeIds = metadata.getVolumeIds(); - List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes(); - // Add VolumeId for each replica in the HdfsBlocksMetadata - for (int j = 0; j < metaBlockIds.length; j++) { - int volumeIndex = metaVolumeIndexes.get(j); - long blockId = metaBlockIds[j]; - // Skip if block wasn't found, or not a valid index into metaVolumeIds - // Also skip if the DN responded with a block we didn't ask for - if (volumeIndex == Integer.MAX_VALUE - || volumeIndex >= metaVolumeIds.size() - || !blockIdToLocBlock.containsKey(blockId)) { - if (LOG.isDebugEnabled()) { - LOG.debug("No data for block " + blockId); - } - continue; - } - // Get the VolumeId by indexing into the list of VolumeIds - // provided by the datanode - byte[] volumeId = metaVolumeIds.get(volumeIndex); - HdfsVolumeId id = new HdfsVolumeId(volumeId); - // Find out which index we are in the LocatedBlock's replicas - LocatedBlock locBlock = blockIdToLocBlock.get(blockId); - DatanodeInfo[] dnInfos = locBlock.getLocations(); - int index = -1; - for (int k = 0; k < dnInfos.length; k++) { - if (dnInfos[k].equals(datanode)) { - index = k; - break; - } - } - if (index < 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Datanode responded with a block volume id we did" + - " not request, omitting."); - } - continue; - } - // Place VolumeId at the same index as the DN's index in the list of - // replicas - List<VolumeId> volumeIds = blockVolumeIds.get(locBlock); - volumeIds.set(index, id); - } - } - return blockVolumeIds; - } - - /** - * Helper method to combine a list of {@link LocatedBlock} with associated - * {@link VolumeId} information to form a list of {@link BlockStorageLocation} - * . - */ - static BlockStorageLocation[] convertToVolumeBlockLocations( - List<LocatedBlock> blocks, - Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException { - // Construct the final return value of VolumeBlockLocation[] - BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks); - List<BlockStorageLocation> volumeBlockLocs = - new ArrayList<BlockStorageLocation>(locations.length); - for (int i = 0; i < locations.length; i++) { - LocatedBlock locBlock = blocks.get(i); - List<VolumeId> volumeIds = blockVolumeIds.get(locBlock); - BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i], - volumeIds.toArray(new VolumeId[0])); - volumeBlockLocs.add(bsLoc); - } - return volumeBlockLocs.toArray(new BlockStorageLocation[] {}); - } - - /** - * Callable that sets up an RPC proxy to a datanode and queries it for - * volume location information for a list of ExtendedBlocks. - */ - private static class VolumeBlockLocationCallable implements - Callable<HdfsBlocksMetadata> { - - private final Configuration configuration; - private final int timeout; - private final DatanodeInfo datanode; - private final String poolId; - private final long[] blockIds; - private final List<Token<BlockTokenIdentifier>> dnTokens; - private final boolean connectToDnViaHostname; - private final Span parentSpan; - - VolumeBlockLocationCallable(Configuration configuration, - DatanodeInfo datanode, String poolId, long []blockIds, - List<Token<BlockTokenIdentifier>> dnTokens, int timeout, - boolean connectToDnViaHostname, Span parentSpan) { - this.configuration = configuration; - this.timeout = timeout; - this.datanode = datanode; - this.poolId = poolId; - this.blockIds = blockIds; - this.dnTokens = dnTokens; - this.connectToDnViaHostname = connectToDnViaHostname; - this.parentSpan = parentSpan; - } - - public DatanodeInfo getDatanodeInfo() { - return datanode; - } - - @Override - public HdfsBlocksMetadata call() throws Exception { - HdfsBlocksMetadata metadata = null; - // Create the RPC proxy and make the RPC - ClientDatanodeProtocol cdp = null; - TraceScope scope = - Trace.startSpan("getHdfsBlocksMetadata", parentSpan); - try { - cdp = DFSUtilClient.createClientDatanodeProtocolProxy( - datanode, configuration, - timeout, connectToDnViaHostname); - metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens); - } catch (IOException e) { - // Bubble this up to the caller, handle with the Future - throw e; - } finally { - scope.close(); - if (cdp != null) { - RPC.stopProxy(cdp); - } - } - return metadata; - } - } -}
