HDFS-8951. Move the shortcircuit package to hdfs-client. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f0f6f1c7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f0f6f1c7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f0f6f1c7 Branch: refs/heads/branch-2 Commit: f0f6f1c7e95b2d2a9ecd44a107c48b9ec965339b Parents: 9264b7e Author: Haohui Mai <[email protected]> Authored: Wed Aug 26 14:02:48 2015 -0700 Committer: Haohui Mai <[email protected]> Committed: Wed Aug 26 14:22:33 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSUtilClient.java | 26 + .../server/datanode/BlockMetadataHeader.java | 209 ++++ .../hadoop/hdfs/shortcircuit/ClientMmap.java | 75 ++ .../hdfs/shortcircuit/DomainSocketFactory.java | 196 ++++ .../hdfs/shortcircuit/ShortCircuitCache.java | 1066 +++++++++++++++++ .../hdfs/shortcircuit/ShortCircuitReplica.java | 352 ++++++ .../shortcircuit/ShortCircuitReplicaInfo.java | 64 ++ .../apache/hadoop/hdfs/util/IOUtilsClient.java | 46 + hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../apache/hadoop/hdfs/BlockReaderFactory.java | 2 +- .../java/org/apache/hadoop/hdfs/DFSClient.java | 25 - .../apache/hadoop/hdfs/RemoteBlockReader.java | 2 +- .../apache/hadoop/hdfs/RemoteBlockReader2.java | 2 +- .../server/datanode/BlockMetadataHeader.java | 211 ---- .../datanode/fsdataset/impl/FsDatasetImpl.java | 19 +- .../impl/RamDiskAsyncLazyPersistService.java | 8 +- .../hadoop/hdfs/shortcircuit/ClientMmap.java | 75 -- .../hdfs/shortcircuit/DomainSocketFactory.java | 194 ---- .../hdfs/shortcircuit/ShortCircuitCache.java | 1068 ------------------ .../hdfs/shortcircuit/ShortCircuitReplica.java | 349 ------ .../shortcircuit/ShortCircuitReplicaInfo.java | 64 -- 21 files changed, 2056 insertions(+), 2000 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index fa1f5e6..3d0acb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -36,11 +36,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -429,4 +431,28 @@ public class DFSUtilClient { new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH); return df.format(date); } + + private static final Map<String, Boolean> localAddrMap = Collections + .synchronizedMap(new HashMap<String, Boolean>()); + + public static boolean isLocalAddress(InetSocketAddress targetAddr) { + InetAddress addr = targetAddr.getAddress(); + Boolean cached = localAddrMap.get(addr.getHostAddress()); + if (cached != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("Address " + targetAddr + + (cached ? " is local" : " is not local")); + } + return cached; + } + + boolean local = NetUtils.isLocalAddress(addr); + + if (LOG.isTraceEnabled()) { + LOG.trace("Address " + targetAddr + + (local ? " is local" : " is not local")); + } + localAddrMap.put(addr.getHostAddress(), local); + return local; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java new file mode 100644 index 0000000..d298690 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.annotations.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * BlockMetadataHeader manages metadata for data blocks on Datanodes. + * This is not related to the Block related functionality in Namenode. + * The biggest part of data block metadata is CRC for the block. + */ [email protected] [email protected] +public class BlockMetadataHeader { + private static final Logger LOG = LoggerFactory.getLogger( + BlockMetadataHeader.class); + + public static final short VERSION = 1; + + /** + * Header includes everything except the checksum(s) themselves. + * Version is two bytes. Following it is the DataChecksum + * that occupies 5 bytes. + */ + private final short version; + private DataChecksum checksum = null; + + @VisibleForTesting + public BlockMetadataHeader(short version, DataChecksum checksum) { + this.checksum = checksum; + this.version = version; + } + + /** Get the version */ + public short getVersion() { + return version; + } + + /** Get the checksum */ + public DataChecksum getChecksum() { + return checksum; + } + + /** + * Read the checksum header from the meta file. + * @return the data checksum obtained from the header. + */ + public static DataChecksum readDataChecksum(File metaFile, int bufSize) + throws IOException { + DataInputStream in = null; + try { + in = new DataInputStream(new BufferedInputStream( + new FileInputStream(metaFile), bufSize)); + return readDataChecksum(in, metaFile); + } finally { + IOUtils.closeStream(in); + } + } + + /** + * Read the checksum header from the meta input stream. + * @return the data checksum obtained from the header. + */ + public static DataChecksum readDataChecksum(final DataInputStream metaIn, + final Object name) throws IOException { + // read and handle the common header here. For now just a version + final BlockMetadataHeader header = readHeader(metaIn); + if (header.getVersion() != VERSION) { + LOG.warn("Unexpected meta-file version for " + name + + ": version in file is " + header.getVersion() + + " but expected version is " + VERSION); + } + return header.getChecksum(); + } + + /** + * Read the header without changing the position of the FileChannel. + * + * @param fc The FileChannel to read. + * @return the Metadata Header. + * @throws IOException on error. + */ + public static BlockMetadataHeader preadHeader(FileChannel fc) + throws IOException { + final byte arr[] = new byte[getHeaderSize()]; + ByteBuffer buf = ByteBuffer.wrap(arr); + + while (buf.hasRemaining()) { + if (fc.read(buf, 0) <= 0) { + throw new EOFException("unexpected EOF while reading " + + "metadata file header"); + } + } + short version = (short)((arr[0] << 8) | (arr[1] & 0xff)); + DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2); + return new BlockMetadataHeader(version, dataChecksum); + } + + /** + * This reads all the fields till the beginning of checksum. + * @return Metadata Header + * @throws IOException + */ + public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException { + return readHeader(in.readShort(), in); + } + + /** + * Reads header at the top of metadata file and returns the header. + * + * @return metadata header for the block + * @throws IOException + */ + public static BlockMetadataHeader readHeader(File file) throws IOException { + DataInputStream in = null; + try { + in = new DataInputStream(new BufferedInputStream( + new FileInputStream(file))); + return readHeader(in); + } finally { + IOUtils.closeStream(in); + } + } + + /** + * Read the header at the beginning of the given block meta file. + * The current file position will be altered by this method. + * If an error occurs, the file is <em>not</em> closed. + */ + public static BlockMetadataHeader readHeader(RandomAccessFile raf) throws IOException { + byte[] buf = new byte[getHeaderSize()]; + raf.seek(0); + raf.readFully(buf, 0, buf.length); + return readHeader(new DataInputStream(new ByteArrayInputStream(buf))); + } + + // Version is already read. + private static BlockMetadataHeader readHeader(short version, DataInputStream in) + throws IOException { + DataChecksum checksum = DataChecksum.newDataChecksum(in); + return new BlockMetadataHeader(version, checksum); + } + + /** + * This writes all the fields till the beginning of checksum. + * @param out DataOutputStream + * @throws IOException + */ + @VisibleForTesting + public static void writeHeader(DataOutputStream out, + BlockMetadataHeader header) + throws IOException { + out.writeShort(header.getVersion()); + header.getChecksum().writeHeader(out); + } + + /** + * Writes all the fields till the beginning of checksum. + * @throws IOException on error + */ + public static void writeHeader(DataOutputStream out, DataChecksum checksum) + throws IOException { + writeHeader(out, new BlockMetadataHeader(VERSION, checksum)); + } + + /** + * Returns the size of the header + */ + public static int getHeaderSize() { + return Short.SIZE/Byte.SIZE + DataChecksum.getChecksumHeaderSize(); + } +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java new file mode 100644 index 0000000..2d871fc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.shortcircuit; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.io.Closeable; +import java.nio.MappedByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A reference to a memory-mapped region used by an HDFS client. + */ [email protected] +public class ClientMmap implements Closeable { + static final Logger LOG = LoggerFactory.getLogger(ClientMmap.class); + + /** + * A reference to the block replica which this mmap relates to. + */ + private ShortCircuitReplica replica; + + /** + * The java ByteBuffer object. + */ + private final MappedByteBuffer map; + + /** + * Whether or not this ClientMmap anchors the replica into memory while + * it exists. Closing an anchored ClientMmap unanchors the replica. + */ + private final boolean anchored; + + ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map, + boolean anchored) { + this.replica = replica; + this.map = map; + this.anchored = anchored; + } + + /** + * Close the ClientMmap object. + */ + @Override + public void close() { + if (replica != null) { + if (anchored) { + replica.removeNoChecksumAnchor(); + } + replica.unref(); + } + replica = null; + } + + public MappedByteBuffer getMappedByteBuffer() { + return map; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java new file mode 100644 index 0000000..6a7d39d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.shortcircuit; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.util.PerformanceAdvisory; + +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DomainSocketFactory { + private static final Logger LOG = LoggerFactory.getLogger( + DomainSocketFactory.class); + + public enum PathState { + UNUSABLE(false, false), + SHORT_CIRCUIT_DISABLED(true, false), + VALID(true, true); + + PathState(boolean usableForDataTransfer, boolean usableForShortCircuit) { + this.usableForDataTransfer = usableForDataTransfer; + this.usableForShortCircuit = usableForShortCircuit; + } + + public boolean getUsableForDataTransfer() { + return usableForDataTransfer; + } + + public boolean getUsableForShortCircuit() { + return usableForShortCircuit; + } + + private final boolean usableForDataTransfer; + private final boolean usableForShortCircuit; + } + + public static class PathInfo { + private final static PathInfo NOT_CONFIGURED = + new PathInfo("", PathState.UNUSABLE); + + final private String path; + final private PathState state; + + PathInfo(String path, PathState state) { + this.path = path; + this.state = state; + } + + public String getPath() { + return path; + } + + public PathState getPathState() { + return state; + } + + @Override + public String toString() { + return new StringBuilder().append("PathInfo{path=").append(path). + append(", state=").append(state).append("}").toString(); + } + } + + /** + * Information about domain socket paths. + */ + final Cache<String, PathState> pathMap = + CacheBuilder.newBuilder() + .expireAfterWrite(10, TimeUnit.MINUTES) + .build(); + + public DomainSocketFactory(ShortCircuitConf conf) { + final String feature; + if (conf.isShortCircuitLocalReads() && (!conf.isUseLegacyBlockReaderLocal())) { + feature = "The short-circuit local reads feature"; + } else if (conf.isDomainSocketDataTraffic()) { + feature = "UNIX domain socket data traffic"; + } else { + feature = null; + } + + if (feature == null) { + PerformanceAdvisory.LOG.debug( + "Both short-circuit local reads and UNIX domain socket are disabled."); + } else { + if (conf.getDomainSocketPath().isEmpty()) { + throw new HadoopIllegalArgumentException(feature + " is enabled but " + + HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set."); + } else if (DomainSocket.getLoadingFailureReason() != null) { + LOG.warn(feature + " cannot be used because " + + DomainSocket.getLoadingFailureReason()); + } else { + LOG.debug(feature + " is enabled."); + } + } + } + + /** + * Get information about a domain socket path. + * + * @param addr The inet address to use. + * @param conf The client configuration. + * + * @return Information about the socket path. + */ + public PathInfo getPathInfo(InetSocketAddress addr, ShortCircuitConf conf) { + // If there is no domain socket path configured, we can't use domain + // sockets. + if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED; + // If we can't do anything with the domain socket, don't create it. + if (!conf.isDomainSocketDataTraffic() && + (!conf.isShortCircuitLocalReads() || conf.isUseLegacyBlockReaderLocal())) { + return PathInfo.NOT_CONFIGURED; + } + // If the DomainSocket code is not loaded, we can't create + // DomainSocket objects. + if (DomainSocket.getLoadingFailureReason() != null) { + return PathInfo.NOT_CONFIGURED; + } + // UNIX domain sockets can only be used to talk to local peers + if (!DFSUtilClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED; + String escapedPath = DomainSocket.getEffectivePath( + conf.getDomainSocketPath(), addr.getPort()); + PathState status = pathMap.getIfPresent(escapedPath); + if (status == null) { + return new PathInfo(escapedPath, PathState.VALID); + } else { + return new PathInfo(escapedPath, status); + } + } + + public DomainSocket createSocket(PathInfo info, int socketTimeout) { + Preconditions.checkArgument(info.getPathState() != PathState.UNUSABLE); + boolean success = false; + DomainSocket sock = null; + try { + sock = DomainSocket.connect(info.getPath()); + sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, socketTimeout); + success = true; + } catch (IOException e) { + LOG.warn("error creating DomainSocket", e); + // fall through + } finally { + if (!success) { + if (sock != null) { + IOUtils.closeQuietly(sock); + } + pathMap.put(info.getPath(), PathState.UNUSABLE); + sock = null; + } + } + return sock; + } + + public void disableShortCircuitForPath(String path) { + pathMap.put(path, PathState.SHORT_CIRCUIT_DISABLED); + } + + public void disableDomainSocketPath(String path) { + pathMap.put(path, PathState.UNUSABLE); + } + + @VisibleForTesting + public void clearPathMap() { + pathMap.invalidateAll(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java new file mode 100644 index 0000000..52c1a6e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java @@ -0,0 +1,1066 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.shortcircuit; + +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.net.DomainPeer; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.util.IOUtilsClient; +import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.DomainSocketWatcher; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.Waitable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The ShortCircuitCache tracks things which the client needs to access + * HDFS block files via short-circuit. + * + * These things include: memory-mapped regions, file descriptors, and shared + * memory areas for communicating with the DataNode. + */ [email protected] +public class ShortCircuitCache implements Closeable { + public static final Logger LOG = LoggerFactory.getLogger( + ShortCircuitCache.class); + + /** + * Expiry thread which makes sure that the file descriptors get closed + * after a while. + */ + private class CacheCleaner implements Runnable, Closeable { + private ScheduledFuture<?> future; + + /** + * Run the CacheCleaner thread. + * + * Whenever a thread requests a ShortCircuitReplica object, we will make + * sure it gets one. That ShortCircuitReplica object can then be re-used + * when another thread requests a ShortCircuitReplica object for the same + * block. So in that sense, there is no maximum size to the cache. + * + * However, when a ShortCircuitReplica object is unreferenced by the + * thread(s) that are using it, it becomes evictable. There are two + * separate eviction lists-- one for mmaped objects, and another for + * non-mmaped objects. We do this in order to avoid having the regular + * files kick the mmaped files out of the cache too quickly. Reusing + * an already-existing mmap gives a huge performance boost, since the + * page table entries don't have to be re-populated. Both the mmap + * and non-mmap evictable lists have maximum sizes and maximum lifespans. + */ + @Override + public void run() { + ShortCircuitCache.this.lock.lock(); + try { + if (ShortCircuitCache.this.closed) return; + long curMs = Time.monotonicNow(); + + if (LOG.isDebugEnabled()) { + LOG.debug(this + ": cache cleaner running at " + curMs); + } + + int numDemoted = demoteOldEvictableMmaped(curMs); + int numPurged = 0; + Long evictionTimeNs = Long.valueOf(0); + while (true) { + Entry<Long, ShortCircuitReplica> entry = + evictable.ceilingEntry(evictionTimeNs); + if (entry == null) break; + evictionTimeNs = entry.getKey(); + long evictionTimeMs = + TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS); + if (evictionTimeMs + maxNonMmappedEvictableLifespanMs >= curMs) break; + ShortCircuitReplica replica = entry.getValue(); + if (LOG.isTraceEnabled()) { + LOG.trace("CacheCleaner: purging " + replica + ": " + + StringUtils.getStackTrace(Thread.currentThread())); + } + purge(replica); + numPurged++; + } + + if (LOG.isDebugEnabled()) { + LOG.debug(this + ": finishing cache cleaner run started at " + + curMs + ". Demoted " + numDemoted + " mmapped replicas; " + + "purged " + numPurged + " replicas."); + } + } finally { + ShortCircuitCache.this.lock.unlock(); + } + } + + @Override + public void close() throws IOException { + if (future != null) { + future.cancel(false); + } + } + + public void setFuture(ScheduledFuture<?> future) { + this.future = future; + } + + /** + * Get the rate at which this cleaner thread should be scheduled. + * + * We do this by taking the minimum expiration time and dividing by 4. + * + * @return the rate in milliseconds at which this thread should be + * scheduled. + */ + public long getRateInMs() { + long minLifespanMs = + Math.min(maxNonMmappedEvictableLifespanMs, + maxEvictableMmapedLifespanMs); + long sampleTimeMs = minLifespanMs / 4; + return (sampleTimeMs < 1) ? 1 : sampleTimeMs; + } + } + + /** + * A task which asks the DataNode to release a short-circuit shared memory + * slot. If successful, this will tell the DataNode to stop monitoring + * changes to the mlock status of the replica associated with the slot. + * It will also allow us (the client) to re-use this slot for another + * replica. If we can't communicate with the DataNode for some reason, + * we tear down the shared memory segment to avoid being in an inconsistent + * state. + */ + private class SlotReleaser implements Runnable { + /** + * The slot that we need to release. + */ + private final Slot slot; + + SlotReleaser(Slot slot) { + this.slot = slot; + } + + @Override + public void run() { + if (LOG.isTraceEnabled()) { + LOG.trace(ShortCircuitCache.this + ": about to release " + slot); + } + final DfsClientShm shm = (DfsClientShm)slot.getShm(); + final DomainSocket shmSock = shm.getPeer().getDomainSocket(); + final String path = shmSock.getPath(); + boolean success = false; + try (DomainSocket sock = DomainSocket.connect(path); + DataOutputStream out = new DataOutputStream( + new BufferedOutputStream(sock.getOutputStream()))) { + new Sender(out).releaseShortCircuitFds(slot.getSlotId()); + DataInputStream in = new DataInputStream(sock.getInputStream()); + ReleaseShortCircuitAccessResponseProto resp = + ReleaseShortCircuitAccessResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + if (resp.getStatus() != Status.SUCCESS) { + String error = resp.hasError() ? resp.getError() : "(unknown)"; + throw new IOException(resp.getStatus().toString() + ": " + error); + } + if (LOG.isTraceEnabled()) { + LOG.trace(ShortCircuitCache.this + ": released " + slot); + } + success = true; + } catch (IOException e) { + LOG.error(ShortCircuitCache.this + ": failed to release " + + "short-circuit shared memory slot " + slot + " by sending " + + "ReleaseShortCircuitAccessRequestProto to " + path + + ". Closing shared memory segment.", e); + } finally { + if (success) { + shmManager.freeSlot(slot); + } else { + shm.getEndpointShmManager().shutdown(shm); + } + } + } + } + + public interface ShortCircuitReplicaCreator { + /** + * Attempt to create a ShortCircuitReplica object. + * + * This callback will be made without holding any locks. + * + * @return a non-null ShortCircuitReplicaInfo object. + */ + ShortCircuitReplicaInfo createShortCircuitReplicaInfo(); + } + + /** + * Lock protecting the cache. + */ + private final ReentrantLock lock = new ReentrantLock(); + + /** + * The executor service that runs the cacheCleaner. + */ + private final ScheduledThreadPoolExecutor cleanerExecutor + = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder(). + setDaemon(true).setNameFormat("ShortCircuitCache_Cleaner"). + build()); + + /** + * The executor service that runs the cacheCleaner. + */ + private final ScheduledThreadPoolExecutor releaserExecutor + = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder(). + setDaemon(true).setNameFormat("ShortCircuitCache_SlotReleaser"). + build()); + + /** + * A map containing all ShortCircuitReplicaInfo objects, organized by Key. + * ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken + * exception. + */ + private final HashMap<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> + replicaInfoMap = new HashMap<ExtendedBlockId, + Waitable<ShortCircuitReplicaInfo>>(); + + /** + * The CacheCleaner. We don't create this and schedule it until it becomes + * necessary. + */ + private CacheCleaner cacheCleaner; + + /** + * Tree of evictable elements. + * + * Maps (unique) insertion time in nanoseconds to the element. + */ + private final TreeMap<Long, ShortCircuitReplica> evictable = + new TreeMap<Long, ShortCircuitReplica>(); + + /** + * Maximum total size of the cache, including both mmapped and + * no$-mmapped elements. + */ + private final int maxTotalSize; + + /** + * Non-mmaped elements older than this will be closed. + */ + private long maxNonMmappedEvictableLifespanMs; + + /** + * Tree of mmaped evictable elements. + * + * Maps (unique) insertion time in nanoseconds to the element. + */ + private final TreeMap<Long, ShortCircuitReplica> evictableMmapped = + new TreeMap<Long, ShortCircuitReplica>(); + + /** + * Maximum number of mmaped evictable elements. + */ + private int maxEvictableMmapedSize; + + /** + * Mmaped elements older than this will be closed. + */ + private final long maxEvictableMmapedLifespanMs; + + /** + * The minimum number of milliseconds we'll wait after an unsuccessful + * mmap attempt before trying again. + */ + private final long mmapRetryTimeoutMs; + + /** + * How long we will keep replicas in the cache before declaring them + * to be stale. + */ + private final long staleThresholdMs; + + /** + * True if the ShortCircuitCache is closed. + */ + private boolean closed = false; + + /** + * Number of existing mmaps associated with this cache. + */ + private int outstandingMmapCount = 0; + + /** + * Manages short-circuit shared memory segments for the client. + */ + private final DfsClientShmManager shmManager; + + public static ShortCircuitCache fromConf(ShortCircuitConf conf) { + return new ShortCircuitCache( + conf.getShortCircuitStreamsCacheSize(), + conf.getShortCircuitStreamsCacheExpiryMs(), + conf.getShortCircuitMmapCacheSize(), + conf.getShortCircuitMmapCacheExpiryMs(), + conf.getShortCircuitMmapCacheRetryTimeout(), + conf.getShortCircuitCacheStaleThresholdMs(), + conf.getShortCircuitSharedMemoryWatcherInterruptCheckMs()); + } + + public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs, + int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs, + long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) { + Preconditions.checkArgument(maxTotalSize >= 0); + this.maxTotalSize = maxTotalSize; + Preconditions.checkArgument(maxNonMmappedEvictableLifespanMs >= 0); + this.maxNonMmappedEvictableLifespanMs = maxNonMmappedEvictableLifespanMs; + Preconditions.checkArgument(maxEvictableMmapedSize >= 0); + this.maxEvictableMmapedSize = maxEvictableMmapedSize; + Preconditions.checkArgument(maxEvictableMmapedLifespanMs >= 0); + this.maxEvictableMmapedLifespanMs = maxEvictableMmapedLifespanMs; + this.mmapRetryTimeoutMs = mmapRetryTimeoutMs; + this.staleThresholdMs = staleThresholdMs; + DfsClientShmManager shmManager = null; + if ((shmInterruptCheckMs > 0) && + (DomainSocketWatcher.getLoadingFailureReason() == null)) { + try { + shmManager = new DfsClientShmManager(shmInterruptCheckMs); + } catch (IOException e) { + LOG.error("failed to create ShortCircuitShmManager", e); + } + } + this.shmManager = shmManager; + } + + public long getStaleThresholdMs() { + return staleThresholdMs; + } + + /** + * Increment the reference count of a replica, and remove it from any free + * list it may be in. + * + * You must hold the cache lock while calling this function. + * + * @param replica The replica we're removing. + */ + private void ref(ShortCircuitReplica replica) { + lock.lock(); + try { + Preconditions.checkArgument(replica.refCount > 0, + "can't ref %s because its refCount reached %d", replica, + replica.refCount); + Long evictableTimeNs = replica.getEvictableTimeNs(); + replica.refCount++; + if (evictableTimeNs != null) { + String removedFrom = removeEvictable(replica); + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": " + removedFrom + + " no longer contains " + replica + ". refCount " + + (replica.refCount - 1) + " -> " + replica.refCount + + StringUtils.getStackTrace(Thread.currentThread())); + + } + } else if (LOG.isTraceEnabled()) { + LOG.trace(this + ": replica refCount " + + (replica.refCount - 1) + " -> " + replica.refCount + + StringUtils.getStackTrace(Thread.currentThread())); + } + } finally { + lock.unlock(); + } + } + + /** + * Unreference a replica. + * + * You must hold the cache lock while calling this function. + * + * @param replica The replica being unreferenced. + */ + void unref(ShortCircuitReplica replica) { + lock.lock(); + try { + // If the replica is stale or unusable, but we haven't purged it yet, + // let's do that. It would be a shame to evict a non-stale replica so + // that we could put a stale or unusable one into the cache. + if (!replica.purged) { + String purgeReason = null; + if (!replica.getDataStream().getChannel().isOpen()) { + purgeReason = "purging replica because its data channel is closed."; + } else if (!replica.getMetaStream().getChannel().isOpen()) { + purgeReason = "purging replica because its meta channel is closed."; + } else if (replica.isStale()) { + purgeReason = "purging replica because it is stale."; + } + if (purgeReason != null) { + if (LOG.isDebugEnabled()) { + LOG.debug(this + ": " + purgeReason); + } + purge(replica); + } + } + String addedString = ""; + boolean shouldTrimEvictionMaps = false; + int newRefCount = --replica.refCount; + if (newRefCount == 0) { + // Close replica, since there are no remaining references to it. + Preconditions.checkArgument(replica.purged, + "Replica %s reached a refCount of 0 without being purged", replica); + replica.close(); + } else if (newRefCount == 1) { + Preconditions.checkState(null == replica.getEvictableTimeNs(), + "Replica %s had a refCount higher than 1, " + + "but was still evictable (evictableTimeNs = %d)", + replica, replica.getEvictableTimeNs()); + if (!replica.purged) { + // Add the replica to the end of an eviction list. + // Eviction lists are sorted by time. + if (replica.hasMmap()) { + insertEvictable(System.nanoTime(), replica, evictableMmapped); + addedString = "added to evictableMmapped, "; + } else { + insertEvictable(System.nanoTime(), replica, evictable); + addedString = "added to evictable, "; + } + shouldTrimEvictionMaps = true; + } + } else { + Preconditions.checkArgument(replica.refCount >= 0, + "replica's refCount went negative (refCount = %d" + + " for %s)", replica.refCount, replica); + } + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": unref replica " + replica + + ": " + addedString + " refCount " + + (newRefCount + 1) + " -> " + newRefCount + + StringUtils.getStackTrace(Thread.currentThread())); + } + if (shouldTrimEvictionMaps) { + trimEvictionMaps(); + } + } finally { + lock.unlock(); + } + } + + /** + * Demote old evictable mmaps into the regular eviction map. + * + * You must hold the cache lock while calling this function. + * + * @param now Current time in monotonic milliseconds. + * @return Number of replicas demoted. + */ + private int demoteOldEvictableMmaped(long now) { + int numDemoted = 0; + boolean needMoreSpace = false; + Long evictionTimeNs = Long.valueOf(0); + + while (true) { + Entry<Long, ShortCircuitReplica> entry = + evictableMmapped.ceilingEntry(evictionTimeNs); + if (entry == null) break; + evictionTimeNs = entry.getKey(); + long evictionTimeMs = + TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS); + if (evictionTimeMs + maxEvictableMmapedLifespanMs >= now) { + if (evictableMmapped.size() < maxEvictableMmapedSize) { + break; + } + needMoreSpace = true; + } + ShortCircuitReplica replica = entry.getValue(); + if (LOG.isTraceEnabled()) { + String rationale = needMoreSpace ? "because we need more space" : + "because it's too old"; + LOG.trace("demoteOldEvictable: demoting " + replica + ": " + + rationale + ": " + + StringUtils.getStackTrace(Thread.currentThread())); + } + removeEvictable(replica, evictableMmapped); + munmap(replica); + insertEvictable(evictionTimeNs, replica, evictable); + numDemoted++; + } + return numDemoted; + } + + /** + * Trim the eviction lists. + */ + private void trimEvictionMaps() { + long now = Time.monotonicNow(); + demoteOldEvictableMmaped(now); + + while (true) { + long evictableSize = evictable.size(); + long evictableMmappedSize = evictableMmapped.size(); + if (evictableSize + evictableMmappedSize <= maxTotalSize) { + return; + } + ShortCircuitReplica replica; + if (evictableSize == 0) { + replica = evictableMmapped.firstEntry().getValue(); + } else { + replica = evictable.firstEntry().getValue(); + } + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": trimEvictionMaps is purging " + replica + + StringUtils.getStackTrace(Thread.currentThread())); + } + purge(replica); + } + } + + /** + * Munmap a replica, updating outstandingMmapCount. + * + * @param replica The replica to munmap. + */ + private void munmap(ShortCircuitReplica replica) { + replica.munmap(); + outstandingMmapCount--; + } + + /** + * Remove a replica from an evictable map. + * + * @param replica The replica to remove. + * @return The map it was removed from. + */ + private String removeEvictable(ShortCircuitReplica replica) { + if (replica.hasMmap()) { + removeEvictable(replica, evictableMmapped); + return "evictableMmapped"; + } else { + removeEvictable(replica, evictable); + return "evictable"; + } + } + + /** + * Remove a replica from an evictable map. + * + * @param replica The replica to remove. + * @param map The map to remove it from. + */ + private void removeEvictable(ShortCircuitReplica replica, + TreeMap<Long, ShortCircuitReplica> map) { + Long evictableTimeNs = replica.getEvictableTimeNs(); + Preconditions.checkNotNull(evictableTimeNs); + ShortCircuitReplica removed = map.remove(evictableTimeNs); + Preconditions.checkState(removed == replica, + "failed to make %s unevictable", replica); + replica.setEvictableTimeNs(null); + } + + /** + * Insert a replica into an evictable map. + * + * If an element already exists with this eviction time, we add a nanosecond + * to it until we find an unused key. + * + * @param evictionTimeNs The eviction time in absolute nanoseconds. + * @param replica The replica to insert. + * @param map The map to insert it into. + */ + private void insertEvictable(Long evictionTimeNs, + ShortCircuitReplica replica, TreeMap<Long, ShortCircuitReplica> map) { + while (map.containsKey(evictionTimeNs)) { + evictionTimeNs++; + } + Preconditions.checkState(null == replica.getEvictableTimeNs()); + replica.setEvictableTimeNs(evictionTimeNs); + map.put(evictionTimeNs, replica); + } + + /** + * Purge a replica from the cache. + * + * This doesn't necessarily close the replica, since there may be + * outstanding references to it. However, it does mean the cache won't + * hand it out to anyone after this. + * + * You must hold the cache lock while calling this function. + * + * @param replica The replica being removed. + */ + private void purge(ShortCircuitReplica replica) { + boolean removedFromInfoMap = false; + String evictionMapName = null; + Preconditions.checkArgument(!replica.purged); + replica.purged = true; + Waitable<ShortCircuitReplicaInfo> val = replicaInfoMap.get(replica.key); + if (val != null) { + ShortCircuitReplicaInfo info = val.getVal(); + if ((info != null) && (info.getReplica() == replica)) { + replicaInfoMap.remove(replica.key); + removedFromInfoMap = true; + } + } + Long evictableTimeNs = replica.getEvictableTimeNs(); + if (evictableTimeNs != null) { + evictionMapName = removeEvictable(replica); + } + if (LOG.isTraceEnabled()) { + StringBuilder builder = new StringBuilder(); + builder.append(this).append(": ").append(": purged "). + append(replica).append(" from the cache."); + if (removedFromInfoMap) { + builder.append(" Removed from the replicaInfoMap."); + } + if (evictionMapName != null) { + builder.append(" Removed from ").append(evictionMapName); + } + LOG.trace(builder.toString()); + } + unref(replica); + } + + /** + * Fetch or create a replica. + * + * You must hold the cache lock while calling this function. + * + * @param key Key to use for lookup. + * @param creator Replica creator callback. Will be called without + * the cache lock being held. + * + * @return Null if no replica could be found or created. + * The replica, otherwise. + */ + public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key, + ShortCircuitReplicaCreator creator) { + Waitable<ShortCircuitReplicaInfo> newWaitable = null; + lock.lock(); + try { + ShortCircuitReplicaInfo info = null; + do { + if (closed) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": can't fetchOrCreate " + key + + " because the cache is closed."); + } + return null; + } + Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key); + if (waitable != null) { + try { + info = fetch(key, waitable); + } catch (RetriableException e) { + if (LOG.isDebugEnabled()) { + LOG.debug(this + ": retrying " + e.getMessage()); + } + continue; + } + } + } while (false); + if (info != null) return info; + // We need to load the replica ourselves. + newWaitable = new Waitable<ShortCircuitReplicaInfo>(lock.newCondition()); + replicaInfoMap.put(key, newWaitable); + } finally { + lock.unlock(); + } + return create(key, creator, newWaitable); + } + + /** + * Fetch an existing ReplicaInfo object. + * + * @param key The key that we're using. + * @param waitable The waitable object to wait on. + * @return The existing ReplicaInfo object, or null if there is + * none. + * + * @throws RetriableException If the caller needs to retry. + */ + private ShortCircuitReplicaInfo fetch(ExtendedBlockId key, + Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException { + // Another thread is already in the process of loading this + // ShortCircuitReplica. So we simply wait for it to complete. + ShortCircuitReplicaInfo info; + try { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": found waitable for " + key); + } + info = waitable.await(); + } catch (InterruptedException e) { + LOG.info(this + ": interrupted while waiting for " + key); + Thread.currentThread().interrupt(); + throw new RetriableException("interrupted"); + } + if (info.getInvalidTokenException() != null) { + LOG.info(this + ": could not get " + key + " due to InvalidToken " + + "exception.", info.getInvalidTokenException()); + return info; + } + ShortCircuitReplica replica = info.getReplica(); + if (replica == null) { + LOG.warn(this + ": failed to get " + key); + return info; + } + if (replica.purged) { + // Ignore replicas that have already been purged from the cache. + throw new RetriableException("Ignoring purged replica " + + replica + ". Retrying."); + } + // Check if the replica is stale before using it. + // If it is, purge it and retry. + if (replica.isStale()) { + LOG.info(this + ": got stale replica " + replica + ". Removing " + + "this replica from the replicaInfoMap and retrying."); + // Remove the cache's reference to the replica. This may or may not + // trigger a close. + purge(replica); + throw new RetriableException("ignoring stale replica " + replica); + } + ref(replica); + return info; + } + + private ShortCircuitReplicaInfo create(ExtendedBlockId key, + ShortCircuitReplicaCreator creator, + Waitable<ShortCircuitReplicaInfo> newWaitable) { + // Handle loading a new replica. + ShortCircuitReplicaInfo info = null; + try { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": loading " + key); + } + info = creator.createShortCircuitReplicaInfo(); + } catch (RuntimeException e) { + LOG.warn(this + ": failed to load " + key, e); + } + if (info == null) info = new ShortCircuitReplicaInfo(); + lock.lock(); + try { + if (info.getReplica() != null) { + // On success, make sure the cache cleaner thread is running. + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": successfully loaded " + info.getReplica()); + } + startCacheCleanerThreadIfNeeded(); + // Note: new ShortCircuitReplicas start with a refCount of 2, + // indicating that both this cache and whoever requested the + // creation of the replica hold a reference. So we don't need + // to increment the reference count here. + } else { + // On failure, remove the waitable from the replicaInfoMap. + Waitable<ShortCircuitReplicaInfo> waitableInMap = replicaInfoMap.get(key); + if (waitableInMap == newWaitable) replicaInfoMap.remove(key); + if (info.getInvalidTokenException() != null) { + LOG.info(this + ": could not load " + key + " due to InvalidToken " + + "exception.", info.getInvalidTokenException()); + } else { + LOG.warn(this + ": failed to load " + key); + } + } + newWaitable.provide(info); + } finally { + lock.unlock(); + } + return info; + } + + private void startCacheCleanerThreadIfNeeded() { + if (cacheCleaner == null) { + cacheCleaner = new CacheCleaner(); + long rateMs = cacheCleaner.getRateInMs(); + ScheduledFuture<?> future = + cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs, + TimeUnit.MILLISECONDS); + cacheCleaner.setFuture(future); + if (LOG.isDebugEnabled()) { + LOG.debug(this + ": starting cache cleaner thread which will run " + + "every " + rateMs + " ms"); + } + } + } + + ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica, + boolean anchored) { + Condition newCond; + lock.lock(); + try { + while (replica.mmapData != null) { + if (replica.mmapData instanceof MappedByteBuffer) { + ref(replica); + MappedByteBuffer mmap = (MappedByteBuffer)replica.mmapData; + return new ClientMmap(replica, mmap, anchored); + } else if (replica.mmapData instanceof Long) { + long lastAttemptTimeMs = (Long)replica.mmapData; + long delta = Time.monotonicNow() - lastAttemptTimeMs; + if (delta < mmapRetryTimeoutMs) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": can't create client mmap for " + + replica + " because we failed to " + + "create one just " + delta + "ms ago."); + } + return null; + } + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": retrying client mmap for " + replica + + ", " + delta + " ms after the previous failure."); + } + } else if (replica.mmapData instanceof Condition) { + Condition cond = (Condition)replica.mmapData; + cond.awaitUninterruptibly(); + } else { + Preconditions.checkState(false, "invalid mmapData type %s", + replica.mmapData.getClass().getName()); + } + } + newCond = lock.newCondition(); + replica.mmapData = newCond; + } finally { + lock.unlock(); + } + MappedByteBuffer map = replica.loadMmapInternal(); + lock.lock(); + try { + if (map == null) { + replica.mmapData = Long.valueOf(Time.monotonicNow()); + newCond.signalAll(); + return null; + } else { + outstandingMmapCount++; + replica.mmapData = map; + ref(replica); + newCond.signalAll(); + return new ClientMmap(replica, map, anchored); + } + } finally { + lock.unlock(); + } + } + + /** + * Close the cache and free all associated resources. + */ + @Override + public void close() { + try { + lock.lock(); + if (closed) return; + closed = true; + LOG.info(this + ": closing"); + maxNonMmappedEvictableLifespanMs = 0; + maxEvictableMmapedSize = 0; + // Close and join cacheCleaner thread. + IOUtilsClient.cleanup(LOG, cacheCleaner); + // Purge all replicas. + while (true) { + Entry<Long, ShortCircuitReplica> entry = evictable.firstEntry(); + if (entry == null) break; + purge(entry.getValue()); + } + while (true) { + Entry<Long, ShortCircuitReplica> entry = evictableMmapped.firstEntry(); + if (entry == null) break; + purge(entry.getValue()); + } + } finally { + lock.unlock(); + } + + releaserExecutor.shutdown(); + cleanerExecutor.shutdown(); + // wait for existing tasks to terminate + try { + if (!releaserExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + LOG.error("Forcing SlotReleaserThreadPool to shutdown!"); + releaserExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + releaserExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + LOG.error("Interrupted while waiting for SlotReleaserThreadPool " + + "to terminate", e); + } + + // wait for existing tasks to terminate + try { + if (!cleanerExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + LOG.error("Forcing CleanerThreadPool to shutdown!"); + cleanerExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + cleanerExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + LOG.error("Interrupted while waiting for CleanerThreadPool " + + "to terminate", e); + } + IOUtilsClient.cleanup(LOG, shmManager); + } + + @VisibleForTesting // ONLY for testing + public interface CacheVisitor { + void visit(int numOutstandingMmaps, + Map<ExtendedBlockId, ShortCircuitReplica> replicas, + Map<ExtendedBlockId, InvalidToken> failedLoads, + Map<Long, ShortCircuitReplica> evictable, + Map<Long, ShortCircuitReplica> evictableMmapped); + } + + @VisibleForTesting // ONLY for testing + public void accept(CacheVisitor visitor) { + lock.lock(); + try { + Map<ExtendedBlockId, ShortCircuitReplica> replicas = + new HashMap<ExtendedBlockId, ShortCircuitReplica>(); + Map<ExtendedBlockId, InvalidToken> failedLoads = + new HashMap<ExtendedBlockId, InvalidToken>(); + for (Entry<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> entry : + replicaInfoMap.entrySet()) { + Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue(); + if (waitable.hasVal()) { + if (waitable.getVal().getReplica() != null) { + replicas.put(entry.getKey(), waitable.getVal().getReplica()); + } else { + // The exception may be null here, indicating a failed load that + // isn't the result of an invalid block token. + failedLoads.put(entry.getKey(), + waitable.getVal().getInvalidTokenException()); + } + } + } + if (LOG.isDebugEnabled()) { + StringBuilder builder = new StringBuilder(); + builder.append("visiting ").append(visitor.getClass().getName()). + append("with outstandingMmapCount=").append(outstandingMmapCount). + append(", replicas="); + String prefix = ""; + for (Entry<ExtendedBlockId, ShortCircuitReplica> entry : replicas.entrySet()) { + builder.append(prefix).append(entry.getValue()); + prefix = ","; + } + prefix = ""; + builder.append(", failedLoads="); + for (Entry<ExtendedBlockId, InvalidToken> entry : failedLoads.entrySet()) { + builder.append(prefix).append(entry.getValue()); + prefix = ","; + } + prefix = ""; + builder.append(", evictable="); + for (Entry<Long, ShortCircuitReplica> entry : evictable.entrySet()) { + builder.append(prefix).append(entry.getKey()). + append(":").append(entry.getValue()); + prefix = ","; + } + prefix = ""; + builder.append(", evictableMmapped="); + for (Entry<Long, ShortCircuitReplica> entry : evictableMmapped.entrySet()) { + builder.append(prefix).append(entry.getKey()). + append(":").append(entry.getValue()); + prefix = ","; + } + LOG.debug(builder.toString()); + } + visitor.visit(outstandingMmapCount, replicas, failedLoads, + evictable, evictableMmapped); + } finally { + lock.unlock(); + } + } + + @Override + public String toString() { + return "ShortCircuitCache(0x" + + Integer.toHexString(System.identityHashCode(this)) + ")"; + } + + /** + * Allocate a new shared memory slot. + * + * @param datanode The datanode to allocate a shm slot with. + * @param peer A peer connected to the datanode. + * @param usedPeer Will be set to true if we use up the provided peer. + * @param blockId The block id and block pool id of the block we're + * allocating this slot for. + * @param clientName The name of the DFSClient allocating the shared + * memory. + * @return Null if short-circuit shared memory is disabled; + * a short-circuit memory slot otherwise. + * @throws IOException An exception if there was an error talking to + * the datanode. + */ + public Slot allocShmSlot(DatanodeInfo datanode, + DomainPeer peer, MutableBoolean usedPeer, + ExtendedBlockId blockId, String clientName) throws IOException { + if (shmManager != null) { + return shmManager.allocSlot(datanode, peer, usedPeer, + blockId, clientName); + } else { + return null; + } + } + + /** + * Free a slot immediately. + * + * ONLY use this if the DataNode is not yet aware of the slot. + * + * @param slot The slot to free. + */ + public void freeSlot(Slot slot) { + Preconditions.checkState(shmManager != null); + slot.makeInvalid(); + shmManager.freeSlot(slot); + } + + /** + * Schedule a shared memory slot to be released. + * + * @param slot The slot to release. + */ + public void scheduleSlotReleaser(Slot slot) { + Preconditions.checkState(shmManager != null); + releaserExecutor.execute(new SlotReleaser(slot)); + } + + @VisibleForTesting + public DfsClientShmManager getDfsClientShmManager() { + return shmManager; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java new file mode 100644 index 0000000..37566e2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java @@ -0,0 +1,352 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.shortcircuit; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.util.IOUtilsClient; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.util.Time; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A ShortCircuitReplica object contains file descriptors for a block that + * we are reading via short-circuit local reads. + * + * The file descriptors can be shared between multiple threads because + * all the operations we perform are stateless-- i.e., we use pread + * instead of read, to avoid using the shared position state. + */ [email protected] +public class ShortCircuitReplica { + public static final Logger LOG = LoggerFactory.getLogger( + ShortCircuitCache.class); + + /** + * Identifies this ShortCircuitReplica object. + */ + final ExtendedBlockId key; + + /** + * The block data input stream. + */ + private final FileInputStream dataStream; + + /** + * The block metadata input stream. + * + * TODO: make this nullable if the file has no checksums on disk. + */ + private final FileInputStream metaStream; + + /** + * Block metadata header. + */ + private final BlockMetadataHeader metaHeader; + + /** + * The cache we belong to. + */ + private final ShortCircuitCache cache; + + /** + * Monotonic time at which the replica was created. + */ + private final long creationTimeMs; + + /** + * If non-null, the shared memory slot associated with this replica. + */ + private final Slot slot; + + /** + * Current mmap state. + * + * Protected by the cache lock. + */ + Object mmapData; + + /** + * True if this replica has been purged from the cache; false otherwise. + * + * Protected by the cache lock. + */ + boolean purged = false; + + /** + * Number of external references to this replica. Replicas are referenced + * by the cache, BlockReaderLocal instances, and by ClientMmap instances. + * The number starts at 2 because when we create a replica, it is referenced + * by both the cache and the requester. + * + * Protected by the cache lock. + */ + int refCount = 2; + + /** + * The monotonic time in nanoseconds at which the replica became evictable, or + * null if it is not evictable. + * + * Protected by the cache lock. + */ + private Long evictableTimeNs = null; + + public ShortCircuitReplica(ExtendedBlockId key, + FileInputStream dataStream, FileInputStream metaStream, + ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException { + this.key = key; + this.dataStream = dataStream; + this.metaStream = metaStream; + this.metaHeader = + BlockMetadataHeader.preadHeader(metaStream.getChannel()); + if (metaHeader.getVersion() != 1) { + throw new IOException("invalid metadata header version " + + metaHeader.getVersion() + ". Can only handle version 1."); + } + this.cache = cache; + this.creationTimeMs = creationTimeMs; + this.slot = slot; + } + + /** + * Decrement the reference count. + */ + public void unref() { + cache.unref(this); + } + + /** + * Check if the replica is stale. + * + * Must be called with the cache lock held. + */ + boolean isStale() { + if (slot != null) { + // Check staleness by looking at the shared memory area we use to + // communicate with the DataNode. + boolean stale = !slot.isValid(); + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": checked shared memory segment. isStale=" + stale); + } + return stale; + } else { + // Fall back to old, time-based staleness method. + long deltaMs = Time.monotonicNow() - creationTimeMs; + long staleThresholdMs = cache.getStaleThresholdMs(); + if (deltaMs > staleThresholdMs) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + " is stale because it's " + deltaMs + + " ms old, and staleThresholdMs = " + staleThresholdMs); + } + return true; + } else { + if (LOG.isTraceEnabled()) { + LOG.trace(this + " is not stale because it's only " + deltaMs + + " ms old, and staleThresholdMs = " + staleThresholdMs); + } + return false; + } + } + } + + /** + * Try to add a no-checksum anchor to our shared memory slot. + * + * It is only possible to add this anchor when the block is mlocked on the Datanode. + * The DataNode will not munlock the block until the number of no-checksum anchors + * for the block reaches zero. + * + * This method does not require any synchronization. + * + * @return True if we successfully added a no-checksum anchor. + */ + public boolean addNoChecksumAnchor() { + if (slot == null) { + return false; + } + boolean result = slot.addAnchor(); + if (LOG.isTraceEnabled()) { + if (result) { + LOG.trace(this + ": added no-checksum anchor to slot " + slot); + } else { + LOG.trace(this + ": could not add no-checksum anchor to slot " + slot); + } + } + return result; + } + + /** + * Remove a no-checksum anchor for our shared memory slot. + * + * This method does not require any synchronization. + */ + public void removeNoChecksumAnchor() { + if (slot != null) { + slot.removeAnchor(); + } + } + + /** + * Check if the replica has an associated mmap that has been fully loaded. + * + * Must be called with the cache lock held. + */ + @VisibleForTesting + public boolean hasMmap() { + return ((mmapData != null) && (mmapData instanceof MappedByteBuffer)); + } + + /** + * Free the mmap associated with this replica. + * + * Must be called with the cache lock held. + */ + void munmap() { + MappedByteBuffer mmap = (MappedByteBuffer)mmapData; + NativeIO.POSIX.munmap(mmap); + mmapData = null; + } + + /** + * Close the replica. + * + * Must be called after there are no more references to the replica in the + * cache or elsewhere. + */ + void close() { + String suffix = ""; + + Preconditions.checkState(refCount == 0, + "tried to close replica with refCount %d: %s", refCount, this); + refCount = -1; + Preconditions.checkState(purged, + "tried to close unpurged replica %s", this); + if (hasMmap()) { + munmap(); + if (LOG.isTraceEnabled()) { + suffix += " munmapped."; + } + } + IOUtilsClient.cleanup(LOG, dataStream, metaStream); + if (slot != null) { + cache.scheduleSlotReleaser(slot); + if (LOG.isTraceEnabled()) { + suffix += " scheduling " + slot + " for later release."; + } + } + if (LOG.isTraceEnabled()) { + LOG.trace("closed " + this + suffix); + } + } + + public FileInputStream getDataStream() { + return dataStream; + } + + public FileInputStream getMetaStream() { + return metaStream; + } + + public BlockMetadataHeader getMetaHeader() { + return metaHeader; + } + + public ExtendedBlockId getKey() { + return key; + } + + public ClientMmap getOrCreateClientMmap(boolean anchor) { + return cache.getOrCreateClientMmap(this, anchor); + } + + MappedByteBuffer loadMmapInternal() { + try { + FileChannel channel = dataStream.getChannel(); + MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0, + Math.min(Integer.MAX_VALUE, channel.size())); + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": created mmap of size " + channel.size()); + } + return mmap; + } catch (IOException e) { + LOG.warn(this + ": mmap error", e); + return null; + } catch (RuntimeException e) { + LOG.warn(this + ": mmap error", e); + return null; + } + } + + /** + * Get the evictable time in nanoseconds. + * + * Note: you must hold the cache lock to call this function. + * + * @return the evictable time in nanoseconds. + */ + public Long getEvictableTimeNs() { + return evictableTimeNs; + } + + /** + * Set the evictable time in nanoseconds. + * + * Note: you must hold the cache lock to call this function. + * + * @param evictableTimeNs The evictable time in nanoseconds, or null + * to set no evictable time. + */ + void setEvictableTimeNs(Long evictableTimeNs) { + this.evictableTimeNs = evictableTimeNs; + } + + @VisibleForTesting + public Slot getSlot() { + return slot; + } + + /** + * Convert the replica to a string for debugging purposes. + * Note that we can't take the lock here. + */ + @Override + public String toString() { + return new StringBuilder().append("ShortCircuitReplica{"). + append("key=").append(key). + append(", metaHeader.version=").append(metaHeader.getVersion()). + append(", metaHeader.checksum=").append(metaHeader.getChecksum()). + append(", ident=").append("0x"). + append(Integer.toHexString(System.identityHashCode(this))). + append(", creationTimeMs=").append(creationTimeMs). + append("}").toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java new file mode 100644 index 0000000..ef0019f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.shortcircuit; + +import org.apache.hadoop.security.token.SecretManager.InvalidToken; + +public final class ShortCircuitReplicaInfo { + private final ShortCircuitReplica replica; + private final InvalidToken exc; + + public ShortCircuitReplicaInfo() { + this.replica = null; + this.exc = null; + } + + public ShortCircuitReplicaInfo(ShortCircuitReplica replica) { + this.replica = replica; + this.exc = null; + } + + public ShortCircuitReplicaInfo(InvalidToken exc) { + this.replica = null; + this.exc = exc; + } + + public ShortCircuitReplica getReplica() { + return replica; + } + + public InvalidToken getInvalidTokenException() { + return exc; + } + + public String toString() { + StringBuilder builder = new StringBuilder(); + String prefix = ""; + builder.append("ShortCircuitReplicaInfo{"); + if (replica != null) { + builder.append(prefix).append(replica); + prefix = ", "; + } + if (exc != null) { + builder.append(prefix).append(exc); + prefix = ", "; + } + builder.append("}"); + return builder.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java new file mode 100644 index 0000000..56f8ecc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import org.slf4j.Logger; + +import java.io.IOException; + +public class IOUtilsClient { + /** + * Close the Closeable objects and <b>ignore</b> any {@link IOException} or + * null pointers. Must only be used for cleanup in exception handlers. + * + * @param log the log to record problems to at debug level. Can be null. + * @param closeables the objects to close + */ + public static void cleanup(Logger log, java.io.Closeable... closeables) { + for (java.io.Closeable c : closeables) { + if (c != null) { + try { + c.close(); + } catch(Throwable e) { + if (log != null && log.isDebugEnabled()) { + log.debug("Exception in closing " + c, e); + } + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 5373e66..5332fc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -493,6 +493,9 @@ Release 2.8.0 - UNRELEASED HDFS-8846. Add a unit test for INotify functionality across a layout version upgrade (Zhe Zhang via Colin P. McCabe) + HDFS-8951. Move the shortcircuit package to hdfs-client. + (Mingliang Liu via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index fec6b85..52ba899 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -419,7 +419,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { if (LOG.isTraceEnabled()) { LOG.trace(this + ": trying to construct BlockReaderLocalLegacy"); } - if (!DFSClient.isLocalAddress(inetSocketAddress)) { + if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " + "the address " + inetSocketAddress + " is not local"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 12646b5..93c5e00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -40,7 +40,6 @@ import java.net.URI; import java.net.UnknownHostException; import java.security.GeneralSecurityException; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedHashMap; @@ -707,30 +706,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } - private static final Map<String, Boolean> localAddrMap = Collections - .synchronizedMap(new HashMap<String, Boolean>()); - - public static boolean isLocalAddress(InetSocketAddress targetAddr) { - InetAddress addr = targetAddr.getAddress(); - Boolean cached = localAddrMap.get(addr.getHostAddress()); - if (cached != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("Address " + targetAddr + - (cached ? " is local" : " is not local")); - } - return cached; - } - - boolean local = NetUtils.isLocalAddress(addr); - - if (LOG.isTraceEnabled()) { - LOG.trace("Address " + targetAddr + - (local ? " is local" : " is not local")); - } - localAddrMap.put(addr.getHostAddress(), local); - return local; - } - /** * Cancel a delegation token * @param token the token to cancel http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 05a9f2c..015e154 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -351,7 +351,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { checksum.getBytesPerChecksum(), checksum.getChecksumSize()); - this.isLocal = DFSClient.isLocalAddress(NetUtils. + this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. createSocketAddr(datanodeID.getXferAddr())); this.peer = peer; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f6f1c7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 4c23d36..2a77cb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -290,7 +290,7 @@ public class RemoteBlockReader2 implements BlockReader { DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache) { - this.isLocal = DFSClient.isLocalAddress(NetUtils. + this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. createSocketAddr(datanodeID.getXferAddr())); // Path is used only for printing block and file information in debug this.peer = peer;
