http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java deleted file mode 100644 index 5392c66..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.classification.InterfaceAudience; - -/** - * Used for injecting faults in DFSClient and DFSOutputStream tests. - * Calls into this are a no-op in production code. - */ -@VisibleForTesting [email protected] -public class DFSClientFaultInjector { - public static DFSClientFaultInjector instance = new DFSClientFaultInjector(); - public static AtomicLong exceptionNum = new AtomicLong(0); - - public static DFSClientFaultInjector get() { - return instance; - } - - public boolean corruptPacket() { - return false; - } - - public boolean uncorruptPacket() { - return false; - } - - public boolean failPacket() { - return false; - } - - public void startFetchFromDatanode() {} - - public void fetchFromDatanodeException() {} - - public void readFromDatanodeDelay() {} -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 3bad9d2..f289b32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hdfs; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java deleted file mode 100644 index 2a228e8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.hadoop.classification.InterfaceAudience; - -import java.util.concurrent.atomic.AtomicLong; - -/** - * The client-side metrics for hedged read feature. - * This class has a number of metrics variables that are publicly accessible, - * we can grab them from client side, like HBase. - */ [email protected] -public class DFSHedgedReadMetrics { - public final AtomicLong hedgedReadOps = new AtomicLong(); - public final AtomicLong hedgedReadOpsWin = new AtomicLong(); - public final AtomicLong hedgedReadOpsInCurThread = new AtomicLong(); - - public void incHedgedReadOps() { - hedgedReadOps.incrementAndGet(); - } - - public void incHedgedReadOpsInCurThread() { - hedgedReadOpsInCurThread.incrementAndGet(); - } - - public void incHedgedReadWins() { - hedgedReadOpsWin.incrementAndGet(); - } - - public long getHedgedReadOps() { - return hedgedReadOps.longValue(); - } - - public long getHedgedReadOpsInCurThread() { - return hedgedReadOpsInCurThread.longValue(); - } - - public long getHedgedReadWins() { - return hedgedReadOpsWin.longValue(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java deleted file mode 100644 index 1f9e3e9..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java +++ /dev/null @@ -1,239 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import com.google.common.collect.Iterators; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.inotify.EventBatch; -import org.apache.hadoop.hdfs.inotify.EventBatchList; -import org.apache.hadoop.hdfs.inotify.MissingEventsException; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.util.Time; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -/** - * Stream for reading inotify events. DFSInotifyEventInputStreams should not - * be shared among multiple threads. - */ [email protected] [email protected] -public class DFSInotifyEventInputStream { - public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream - .class); - - /** - * The trace sampler to use when making RPCs to the NameNode. - */ - private final Sampler<?> traceSampler; - - private final ClientProtocol namenode; - private Iterator<EventBatch> it; - private long lastReadTxid; - /** - * The most recent txid the NameNode told us it has sync'ed -- helps us - * determine how far behind we are in the edit stream. - */ - private long syncTxid; - /** - * Used to generate wait times in {@link DFSInotifyEventInputStream#take()}. - */ - private Random rng = new Random(); - - private static final int INITIAL_WAIT_MS = 10; - - DFSInotifyEventInputStream(Sampler<?> traceSampler, ClientProtocol namenode) - throws IOException { - // Only consider new transaction IDs. - this(traceSampler, namenode, namenode.getCurrentEditLogTxid()); - } - - DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode, - long lastReadTxid) throws IOException { - this.traceSampler = traceSampler; - this.namenode = namenode; - this.it = Iterators.emptyIterator(); - this.lastReadTxid = lastReadTxid; - } - - /** - * Returns the next batch of events in the stream or null if no new - * batches are currently available. - * - * @throws IOException because of network error or edit log - * corruption. Also possible if JournalNodes are unresponsive in the - * QJM setting (even one unresponsive JournalNode is enough in rare cases), - * so catching this exception and retrying at least a few times is - * recommended. - * @throws MissingEventsException if we cannot return the next batch in the - * stream because the data for the events (and possibly some subsequent - * events) has been deleted (generally because this stream is a very large - * number of transactions behind the current state of the NameNode). It is - * safe to continue reading from the stream after this exception is thrown - * The next available batch of events will be returned. - */ - public EventBatch poll() throws IOException, MissingEventsException { - TraceScope scope = - Trace.startSpan("inotifyPoll", traceSampler); - try { - // need to keep retrying until the NN sends us the latest committed txid - if (lastReadTxid == -1) { - LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); - lastReadTxid = namenode.getCurrentEditLogTxid(); - return null; - } - if (!it.hasNext()) { - EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); - if (el.getLastTxid() != -1) { - // we only want to set syncTxid when we were actually able to read some - // edits on the NN -- otherwise it will seem like edits are being - // generated faster than we can read them when the problem is really - // that we are temporarily unable to read edits - syncTxid = el.getSyncTxid(); - it = el.getBatches().iterator(); - long formerLastReadTxid = lastReadTxid; - lastReadTxid = el.getLastTxid(); - if (el.getFirstTxid() != formerLastReadTxid + 1) { - throw new MissingEventsException(formerLastReadTxid + 1, - el.getFirstTxid()); - } - } else { - LOG.debug("poll(): read no edits from the NN when requesting edits " + - "after txid {}", lastReadTxid); - return null; - } - } - - if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the - // newly seen edit log ops actually got converted to events - return it.next(); - } else { - return null; - } - } finally { - scope.close(); - } - } - - /** - * Return a estimate of how many transaction IDs behind the NameNode's - * current state this stream is. Clients should periodically call this method - * and check if its result is steadily increasing, which indicates that they - * are falling behind (i.e. transaction are being generated faster than the - * client is reading them). If a client falls too far behind events may be - * deleted before the client can read them. - * <p/> - * A return value of -1 indicates that an estimate could not be produced, and - * should be ignored. The value returned by this method is really only useful - * when compared to previous or subsequent returned values. - */ - public long getTxidsBehindEstimate() { - if (syncTxid == 0) { - return -1; - } else { - assert syncTxid >= lastReadTxid; - // this gives the difference between the last txid we have fetched to the - // client and syncTxid at the time we last fetched events from the - // NameNode - return syncTxid - lastReadTxid; - } - } - - /** - * Returns the next event batch in the stream, waiting up to the specified - * amount of time for a new batch. Returns null if one is not available at the - * end of the specified amount of time. The time before the method returns may - * exceed the specified amount of time by up to the time required for an RPC - * to the NameNode. - * - * @param time number of units of the given TimeUnit to wait - * @param tu the desired TimeUnit - * @throws IOException see {@link DFSInotifyEventInputStream#poll()} - * @throws MissingEventsException - * see {@link DFSInotifyEventInputStream#poll()} - * @throws InterruptedException if the calling thread is interrupted - */ - public EventBatch poll(long time, TimeUnit tu) throws IOException, - InterruptedException, MissingEventsException { - TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler); - EventBatch next = null; - try { - long initialTime = Time.monotonicNow(); - long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); - long nextWait = INITIAL_WAIT_MS; - while ((next = poll()) == null) { - long timeLeft = totalWait - (Time.monotonicNow() - initialTime); - if (timeLeft <= 0) { - LOG.debug("timed poll(): timed out"); - break; - } else if (timeLeft < nextWait * 2) { - nextWait = timeLeft; - } else { - nextWait *= 2; - } - LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", - nextWait); - Thread.sleep(nextWait); - } - } finally { - scope.close(); - } - return next; - } - - /** - * Returns the next batch of events in the stream, waiting indefinitely if - * a new batch is not immediately available. - * - * @throws IOException see {@link DFSInotifyEventInputStream#poll()} - * @throws MissingEventsException see - * {@link DFSInotifyEventInputStream#poll()} - * @throws InterruptedException if the calling thread is interrupted - */ - public EventBatch take() throws IOException, InterruptedException, - MissingEventsException { - TraceScope scope = Trace.startSpan("inotifyTake", traceSampler); - EventBatch next = null; - try { - int nextWaitMin = INITIAL_WAIT_MS; - while ((next = poll()) == null) { - // sleep for a random period between nextWaitMin and nextWaitMin * 2 - // to avoid stampedes at the NN if there are multiple clients - int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); - LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); - Thread.sleep(sleepTime); - // the maximum sleep is 2 minutes - nextWaitMin = Math.min(60000, nextWaitMin * 2); - } - } finally { - scope.close(); - } - - return next; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java deleted file mode 100644 index 139a27c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ /dev/null @@ -1,1915 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.EOFException; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.google.common.base.Preconditions; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ByteBufferReadable; -import org.apache.hadoop.fs.ByteBufferUtil; -import org.apache.hadoop.fs.CanSetDropBehind; -import org.apache.hadoop.fs.CanSetReadahead; -import org.apache.hadoop.fs.CanUnbuffer; -import org.apache.hadoop.fs.ChecksumException; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileEncryptionInfo; -import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.fs.UnresolvedLinkException; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.io.ByteBufferPool; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.IdentityHashStore; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; - -import com.google.common.annotations.VisibleForTesting; - -/**************************************************************** - * DFSInputStream provides bytes from a named file. It handles - * negotiation of the namenode and various datanodes as necessary. - ****************************************************************/ [email protected] -public class DFSInputStream extends FSInputStream -implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess, CanUnbuffer { - @VisibleForTesting - public static boolean tcpReadsDisabledForTesting = false; - private long hedgedReadOpsLoopNumForTesting = 0; - protected final DFSClient dfsClient; - protected AtomicBoolean closed = new AtomicBoolean(false); - protected final String src; - protected final boolean verifyChecksum; - - // state by stateful read only: - // (protected by lock on this) - ///// - private DatanodeInfo currentNode = null; - protected LocatedBlock currentLocatedBlock = null; - protected long pos = 0; - protected long blockEnd = -1; - private BlockReader blockReader = null; - //// - - // state shared by stateful and positional read: - // (protected by lock on infoLock) - //// - protected LocatedBlocks locatedBlocks = null; - private long lastBlockBeingWrittenLength = 0; - private FileEncryptionInfo fileEncryptionInfo = null; - protected CachingStrategy cachingStrategy; - //// - - protected final ReadStatistics readStatistics = new ReadStatistics(); - // lock for state shared between read and pread - // Note: Never acquire a lock on <this> with this lock held to avoid deadlocks - // (it's OK to acquire this lock when the lock on <this> is held) - protected final Object infoLock = new Object(); - - /** - * Track the ByteBuffers that we have handed out to readers. - * - * The value type can be either ByteBufferPool or ClientMmap, depending on - * whether we this is a memory-mapped buffer or not. - */ - private IdentityHashStore<ByteBuffer, Object> extendedReadBuffers; - - private synchronized IdentityHashStore<ByteBuffer, Object> - getExtendedReadBuffers() { - if (extendedReadBuffers == null) { - extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0); - } - return extendedReadBuffers; - } - - public static class ReadStatistics { - public ReadStatistics() { - clear(); - } - - public ReadStatistics(ReadStatistics rhs) { - this.totalBytesRead = rhs.getTotalBytesRead(); - this.totalLocalBytesRead = rhs.getTotalLocalBytesRead(); - this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead(); - this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead(); - } - - /** - * @return The total bytes read. This will always be at least as - * high as the other numbers, since it includes all of them. - */ - public long getTotalBytesRead() { - return totalBytesRead; - } - - /** - * @return The total local bytes read. This will always be at least - * as high as totalShortCircuitBytesRead, since all short-circuit - * reads are also local. - */ - public long getTotalLocalBytesRead() { - return totalLocalBytesRead; - } - - /** - * @return The total short-circuit local bytes read. - */ - public long getTotalShortCircuitBytesRead() { - return totalShortCircuitBytesRead; - } - - /** - * @return The total number of zero-copy bytes read. - */ - public long getTotalZeroCopyBytesRead() { - return totalZeroCopyBytesRead; - } - - /** - * @return The total number of bytes read which were not local. - */ - public long getRemoteBytesRead() { - return totalBytesRead - totalLocalBytesRead; - } - - void addRemoteBytes(long amt) { - this.totalBytesRead += amt; - } - - void addLocalBytes(long amt) { - this.totalBytesRead += amt; - this.totalLocalBytesRead += amt; - } - - void addShortCircuitBytes(long amt) { - this.totalBytesRead += amt; - this.totalLocalBytesRead += amt; - this.totalShortCircuitBytesRead += amt; - } - - void addZeroCopyBytes(long amt) { - this.totalBytesRead += amt; - this.totalLocalBytesRead += amt; - this.totalShortCircuitBytesRead += amt; - this.totalZeroCopyBytesRead += amt; - } - - void clear() { - this.totalBytesRead = 0; - this.totalLocalBytesRead = 0; - this.totalShortCircuitBytesRead = 0; - this.totalZeroCopyBytesRead = 0; - } - - private long totalBytesRead; - - private long totalLocalBytesRead; - - private long totalShortCircuitBytesRead; - - private long totalZeroCopyBytesRead; - } - - /** - * This variable tracks the number of failures since the start of the - * most recent user-facing operation. That is to say, it should be reset - * whenever the user makes a call on this stream, and if at any point - * during the retry logic, the failure count exceeds a threshold, - * the errors will be thrown back to the operation. - * - * Specifically this counts the number of times the client has gone - * back to the namenode to get a new list of block locations, and is - * capped at maxBlockAcquireFailures - */ - protected int failures = 0; - - /* XXX Use of CocurrentHashMap is temp fix. Need to fix - * parallel accesses to DFSInputStream (through ptreads) properly */ - private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = - new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>(); - - private byte[] oneByteBuf; // used for 'int read()' - - void addToDeadNodes(DatanodeInfo dnInfo) { - deadNodes.put(dnInfo, dnInfo); - } - - DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, - LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException { - this.dfsClient = dfsClient; - this.verifyChecksum = verifyChecksum; - this.src = src; - synchronized (infoLock) { - this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy(); - } - this.locatedBlocks = locatedBlocks; - openInfo(false); - } - - /** - * Grab the open-file info from namenode - * @param refreshLocatedBlocks whether to re-fetch locatedblocks - */ - void openInfo(boolean refreshLocatedBlocks) throws IOException, - UnresolvedLinkException { - final DfsClientConf conf = dfsClient.getConf(); - synchronized(infoLock) { - lastBlockBeingWrittenLength = - fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks); - int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); - while (retriesForLastBlockLength > 0) { - // Getting last block length as -1 is a special case. When cluster - // restarts, DNs may not report immediately. At this time partial block - // locations will not be available with NN for getting the length. Lets - // retry for 3 times to get the length. - if (lastBlockBeingWrittenLength == -1) { - DFSClient.LOG.warn("Last block locations not available. " - + "Datanodes might not have reported blocks completely." - + " Will retry for " + retriesForLastBlockLength + " times"); - waitFor(conf.getRetryIntervalForGetLastBlockLength()); - lastBlockBeingWrittenLength = - fetchLocatedBlocksAndGetLastBlockLength(true); - } else { - break; - } - retriesForLastBlockLength--; - } - if (retriesForLastBlockLength == 0) { - throw new IOException("Could not obtain the last block locations."); - } - } - } - - private void waitFor(int waitTime) throws IOException { - try { - Thread.sleep(waitTime); - } catch (InterruptedException e) { - throw new IOException( - "Interrupted while getting the last block length."); - } - } - - private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh) - throws IOException { - LocatedBlocks newInfo = locatedBlocks; - if (locatedBlocks == null || refresh) { - newInfo = dfsClient.getLocatedBlocks(src, 0); - } - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("newInfo = " + newInfo); - } - if (newInfo == null) { - throw new IOException("Cannot open filename " + src); - } - - if (locatedBlocks != null) { - Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); - Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); - while (oldIter.hasNext() && newIter.hasNext()) { - if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) { - throw new IOException("Blocklist for " + src + " has changed!"); - } - } - } - locatedBlocks = newInfo; - long lastBlockBeingWrittenLength = 0; - if (!locatedBlocks.isLastBlockComplete()) { - final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); - if (last != null) { - if (last.getLocations().length == 0) { - if (last.getBlockSize() == 0) { - // if the length is zero, then no data has been written to - // datanode. So no need to wait for the locations. - return 0; - } - return -1; - } - final long len = readBlockLength(last); - last.getBlock().setNumBytes(len); - lastBlockBeingWrittenLength = len; - } - } - - fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo(); - - return lastBlockBeingWrittenLength; - } - - /** Read the block length from one of the datanodes. */ - private long readBlockLength(LocatedBlock locatedblock) throws IOException { - assert locatedblock != null : "LocatedBlock cannot be null"; - int replicaNotFoundCount = locatedblock.getLocations().length; - - final DfsClientConf conf = dfsClient.getConf(); - for(DatanodeInfo datanode : locatedblock.getLocations()) { - ClientDatanodeProtocol cdp = null; - - try { - cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode, - dfsClient.getConfiguration(), conf.getSocketTimeout(), - conf.isConnectToDnViaHostname(), locatedblock); - - final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); - - if (n >= 0) { - return n; - } - } - catch(IOException ioe) { - if (ioe instanceof RemoteException && - (((RemoteException) ioe).unwrapRemoteException() instanceof - ReplicaNotFoundException)) { - // special case : replica might not be on the DN, treat as 0 length - replicaNotFoundCount--; - } - - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode " - + datanode + " for block " + locatedblock.getBlock(), ioe); - } - } finally { - if (cdp != null) { - RPC.stopProxy(cdp); - } - } - } - - // Namenode told us about these locations, but none know about the replica - // means that we hit the race between pipeline creation start and end. - // we require all 3 because some other exception could have happened - // on a DN that has it. we want to report that error - if (replicaNotFoundCount == 0) { - return 0; - } - - throw new IOException("Cannot obtain block length for " + locatedblock); - } - - public long getFileLength() { - synchronized(infoLock) { - return locatedBlocks == null? 0: - locatedBlocks.getFileLength() + lastBlockBeingWrittenLength; - } - } - - // Short circuit local reads are forbidden for files that are - // under construction. See HDFS-2757. - boolean shortCircuitForbidden() { - synchronized(infoLock) { - return locatedBlocks.isUnderConstruction(); - } - } - - /** - * Returns the datanode from which the stream is currently reading. - */ - public synchronized DatanodeInfo getCurrentDatanode() { - return currentNode; - } - - /** - * Returns the block containing the target position. - */ - synchronized public ExtendedBlock getCurrentBlock() { - if (currentLocatedBlock == null){ - return null; - } - return currentLocatedBlock.getBlock(); - } - - /** - * Return collection of blocks that has already been located. - */ - public List<LocatedBlock> getAllBlocks() throws IOException { - return getBlockRange(0, getFileLength()); - } - - /** - * Get block at the specified position. - * Fetch it from the namenode if not cached. - * - * @param offset block corresponding to this offset in file is returned - * @return located block - * @throws IOException - */ - protected LocatedBlock getBlockAt(long offset) throws IOException { - synchronized(infoLock) { - assert (locatedBlocks != null) : "locatedBlocks is null"; - - final LocatedBlock blk; - - //check offset - if (offset < 0 || offset >= getFileLength()) { - throw new IOException("offset < 0 || offset >= getFileLength(), offset=" - + offset - + ", locatedBlocks=" + locatedBlocks); - } - else if (offset >= locatedBlocks.getFileLength()) { - // offset to the portion of the last block, - // which is not known to the name-node yet; - // getting the last block - blk = locatedBlocks.getLastLocatedBlock(); - } - else { - // search cached blocks first - int targetBlockIdx = locatedBlocks.findBlock(offset); - if (targetBlockIdx < 0) { // block is not cached - targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); - // fetch more blocks - final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset); - assert (newBlocks != null) : "Could not find target position " + offset; - locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); - } - blk = locatedBlocks.get(targetBlockIdx); - } - return blk; - } - } - - /** Fetch a block from namenode and cache it */ - protected void fetchBlockAt(long offset) throws IOException { - synchronized(infoLock) { - int targetBlockIdx = locatedBlocks.findBlock(offset); - if (targetBlockIdx < 0) { // block is not cached - targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); - } - // fetch blocks - final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset); - if (newBlocks == null) { - throw new IOException("Could not find target position " + offset); - } - locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); - } - } - - /** - * Get blocks in the specified range. - * Fetch them from the namenode if not cached. This function - * will not get a read request beyond the EOF. - * @param offset starting offset in file - * @param length length of data - * @return consequent segment of located blocks - * @throws IOException - */ - private List<LocatedBlock> getBlockRange(long offset, - long length) throws IOException { - // getFileLength(): returns total file length - // locatedBlocks.getFileLength(): returns length of completed blocks - if (offset >= getFileLength()) { - throw new IOException("Offset: " + offset + - " exceeds file length: " + getFileLength()); - } - synchronized(infoLock) { - final List<LocatedBlock> blocks; - final long lengthOfCompleteBlk = locatedBlocks.getFileLength(); - final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk; - final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk; - - if (readOffsetWithinCompleteBlk) { - //get the blocks of finalized (completed) block range - blocks = getFinalizedBlockRange(offset, - Math.min(length, lengthOfCompleteBlk - offset)); - } else { - blocks = new ArrayList<LocatedBlock>(1); - } - - // get the blocks from incomplete block range - if (readLengthPastCompleteBlk) { - blocks.add(locatedBlocks.getLastLocatedBlock()); - } - - return blocks; - } - } - - /** - * Get blocks in the specified range. - * Includes only the complete blocks. - * Fetch them from the namenode if not cached. - */ - private List<LocatedBlock> getFinalizedBlockRange( - long offset, long length) throws IOException { - synchronized(infoLock) { - assert (locatedBlocks != null) : "locatedBlocks is null"; - List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>(); - // search cached blocks first - int blockIdx = locatedBlocks.findBlock(offset); - if (blockIdx < 0) { // block is not cached - blockIdx = LocatedBlocks.getInsertIndex(blockIdx); - } - long remaining = length; - long curOff = offset; - while(remaining > 0) { - LocatedBlock blk = null; - if(blockIdx < locatedBlocks.locatedBlockCount()) - blk = locatedBlocks.get(blockIdx); - if (blk == null || curOff < blk.getStartOffset()) { - LocatedBlocks newBlocks; - newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining); - locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks()); - continue; - } - assert curOff >= blk.getStartOffset() : "Block not found"; - blockRange.add(blk); - long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff; - remaining -= bytesRead; - curOff += bytesRead; - blockIdx++; - } - return blockRange; - } - } - - /** - * Open a DataInputStream to a DataNode so that it can be read from. - * We get block ID and the IDs of the destinations at startup, from the namenode. - */ - private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { - if (target >= getFileLength()) { - throw new IOException("Attempted to read past end of file"); - } - - // Will be getting a new BlockReader. - closeCurrentBlockReaders(); - - // - // Connect to best DataNode for desired Block, with potential offset - // - DatanodeInfo chosenNode = null; - int refetchToken = 1; // only need to get a new access token once - int refetchEncryptionKey = 1; // only need to get a new encryption key once - - boolean connectFailedOnce = false; - - while (true) { - // - // Compute desired block - // - LocatedBlock targetBlock = getBlockAt(target); - - // update current position - this.pos = target; - this.blockEnd = targetBlock.getStartOffset() + - targetBlock.getBlockSize() - 1; - this.currentLocatedBlock = targetBlock; - - long offsetIntoBlock = target - targetBlock.getStartOffset(); - - DNAddrPair retval = chooseDataNode(targetBlock, null); - chosenNode = retval.info; - InetSocketAddress targetAddr = retval.addr; - StorageType storageType = retval.storageType; - - try { - blockReader = getBlockReader(targetBlock, offsetIntoBlock, - targetBlock.getBlockSize() - offsetIntoBlock, targetAddr, - storageType, chosenNode); - if(connectFailedOnce) { - DFSClient.LOG.info("Successfully connected to " + targetAddr + - " for " + targetBlock.getBlock()); - } - return chosenNode; - } catch (IOException ex) { - if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { - DFSClient.LOG.info("Will fetch a new encryption key and retry, " - + "encryption key was invalid when connecting to " + targetAddr - + " : " + ex); - // The encryption key used is invalid. - refetchEncryptionKey--; - dfsClient.clearDataEncryptionKey(); - } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { - refetchToken--; - fetchBlockAt(target); - } else { - connectFailedOnce = true; - DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" - + ", add to deadNodes and continue. " + ex, ex); - // Put chosen node into dead list, continue - addToDeadNodes(chosenNode); - } - } - } - } - - protected BlockReader getBlockReader(LocatedBlock targetBlock, - long offsetInBlock, long length, InetSocketAddress targetAddr, - StorageType storageType, DatanodeInfo datanode) throws IOException { - ExtendedBlock blk = targetBlock.getBlock(); - Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); - CachingStrategy curCachingStrategy; - boolean shortCircuitForbidden; - synchronized (infoLock) { - curCachingStrategy = cachingStrategy; - shortCircuitForbidden = shortCircuitForbidden(); - } - return new BlockReaderFactory(dfsClient.getConf()). - setInetSocketAddress(targetAddr). - setRemotePeerFactory(dfsClient). - setDatanodeInfo(datanode). - setStorageType(storageType). - setFileName(src). - setBlock(blk). - setBlockToken(accessToken). - setStartOffset(offsetInBlock). - setVerifyChecksum(verifyChecksum). - setClientName(dfsClient.clientName). - setLength(length). - setCachingStrategy(curCachingStrategy). - setAllowShortCircuitLocalReads(!shortCircuitForbidden). - setClientCacheContext(dfsClient.getClientContext()). - setUserGroupInformation(dfsClient.ugi). - setConfiguration(dfsClient.getConfiguration()). - build(); - } - - /** - * Close it down! - */ - @Override - public synchronized void close() throws IOException { - if (!closed.compareAndSet(false, true)) { - DFSClient.LOG.debug("DFSInputStream has been closed already"); - return; - } - dfsClient.checkOpen(); - - if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { - final StringBuilder builder = new StringBuilder(); - extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() { - private String prefix = ""; - @Override - public void accept(ByteBuffer k, Object v) { - builder.append(prefix).append(k); - prefix = ", "; - } - }); - DFSClient.LOG.warn("closing file " + src + ", but there are still " + - "unreleased ByteBuffers allocated by read(). " + - "Please release " + builder.toString() + "."); - } - closeCurrentBlockReaders(); - super.close(); - } - - @Override - public synchronized int read() throws IOException { - if (oneByteBuf == null) { - oneByteBuf = new byte[1]; - } - int ret = read( oneByteBuf, 0, 1 ); - return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff); - } - - /** - * Wraps different possible read implementations so that readBuffer can be - * strategy-agnostic. - */ - interface ReaderStrategy { - public int doRead(BlockReader blockReader, int off, int len) - throws ChecksumException, IOException; - - /** - * Copy data from the src ByteBuffer into the read buffer. - * @param src The src buffer where the data is copied from - * @param offset Useful only when the ReadStrategy is based on a byte array. - * Indicate the offset of the byte array for copy. - * @param length Useful only when the ReadStrategy is based on a byte array. - * Indicate the length of the data to copy. - */ - public int copyFrom(ByteBuffer src, int offset, int length); - } - - protected void updateReadStatistics(ReadStatistics readStatistics, - int nRead, BlockReader blockReader) { - if (nRead <= 0) return; - synchronized(infoLock) { - if (blockReader.isShortCircuit()) { - readStatistics.addShortCircuitBytes(nRead); - } else if (blockReader.isLocal()) { - readStatistics.addLocalBytes(nRead); - } else { - readStatistics.addRemoteBytes(nRead); - } - } - } - - /** - * Used to read bytes into a byte[] - */ - private class ByteArrayStrategy implements ReaderStrategy { - final byte[] buf; - - public ByteArrayStrategy(byte[] buf) { - this.buf = buf; - } - - @Override - public int doRead(BlockReader blockReader, int off, int len) - throws ChecksumException, IOException { - int nRead = blockReader.read(buf, off, len); - updateReadStatistics(readStatistics, nRead, blockReader); - return nRead; - } - - @Override - public int copyFrom(ByteBuffer src, int offset, int length) { - ByteBuffer writeSlice = src.duplicate(); - writeSlice.get(buf, offset, length); - return length; - } - } - - /** - * Used to read bytes into a user-supplied ByteBuffer - */ - protected class ByteBufferStrategy implements ReaderStrategy { - final ByteBuffer buf; - ByteBufferStrategy(ByteBuffer buf) { - this.buf = buf; - } - - @Override - public int doRead(BlockReader blockReader, int off, int len) - throws ChecksumException, IOException { - int oldpos = buf.position(); - int oldlimit = buf.limit(); - boolean success = false; - try { - int ret = blockReader.read(buf); - success = true; - updateReadStatistics(readStatistics, ret, blockReader); - if (ret == 0) { - DFSClient.LOG.warn("zero"); - } - return ret; - } finally { - if (!success) { - // Reset to original state so that retries work correctly. - buf.position(oldpos); - buf.limit(oldlimit); - } - } - } - - @Override - public int copyFrom(ByteBuffer src, int offset, int length) { - ByteBuffer writeSlice = src.duplicate(); - int remaining = Math.min(buf.remaining(), writeSlice.remaining()); - writeSlice.limit(writeSlice.position() + remaining); - buf.put(writeSlice); - return remaining; - } - } - - /* This is a used by regular read() and handles ChecksumExceptions. - * name readBuffer() is chosen to imply similarity to readBuffer() in - * ChecksumFileSystem - */ - private synchronized int readBuffer(ReaderStrategy reader, int off, int len, - Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) - throws IOException { - IOException ioe; - - /* we retry current node only once. So this is set to true only here. - * Intention is to handle one common case of an error that is not a - * failure on datanode or client : when DataNode closes the connection - * since client is idle. If there are other cases of "non-errors" then - * then a datanode might be retried by setting this to true again. - */ - boolean retryCurrentNode = true; - - while (true) { - // retry as many times as seekToNewSource allows. - try { - return reader.doRead(blockReader, off, len); - } catch ( ChecksumException ce ) { - DFSClient.LOG.warn("Found Checksum error for " - + getCurrentBlock() + " from " + currentNode - + " at " + ce.getPos()); - ioe = ce; - retryCurrentNode = false; - // we want to remember which block replicas we have tried - addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, - corruptedBlockMap); - } catch ( IOException e ) { - if (!retryCurrentNode) { - DFSClient.LOG.warn("Exception while reading from " - + getCurrentBlock() + " of " + src + " from " - + currentNode, e); - } - ioe = e; - } - boolean sourceFound = false; - if (retryCurrentNode) { - /* possibly retry the same node so that transient errors don't - * result in application level failures (e.g. Datanode could have - * closed the connection because the client is idle for too long). - */ - sourceFound = seekToBlockSource(pos); - } else { - addToDeadNodes(currentNode); - sourceFound = seekToNewSource(pos); - } - if (!sourceFound) { - throw ioe; - } - retryCurrentNode = false; - } - } - - protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { - dfsClient.checkOpen(); - if (closed.get()) { - throw new IOException("Stream closed"); - } - Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap - = new HashMap<ExtendedBlock, Set<DatanodeInfo>>(); - failures = 0; - if (pos < getFileLength()) { - int retries = 2; - while (retries > 0) { - try { - // currentNode can be left as null if previous read had a checksum - // error on the same block. See HDFS-3067 - if (pos > blockEnd || currentNode == null) { - currentNode = blockSeekTo(pos); - } - int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); - synchronized(infoLock) { - if (locatedBlocks.isLastBlockComplete()) { - realLen = (int) Math.min(realLen, - locatedBlocks.getFileLength() - pos); - } - } - int result = readBuffer(strategy, off, realLen, corruptedBlockMap); - - if (result >= 0) { - pos += result; - } else { - // got a EOS from reader though we expect more data on it. - throw new IOException("Unexpected EOS from the reader"); - } - if (dfsClient.stats != null) { - dfsClient.stats.incrementBytesRead(result); - } - return result; - } catch (ChecksumException ce) { - throw ce; - } catch (IOException e) { - if (retries == 1) { - DFSClient.LOG.warn("DFS Read", e); - } - blockEnd = -1; - if (currentNode != null) { addToDeadNodes(currentNode); } - if (--retries == 0) { - throw e; - } - } finally { - // Check if need to report block replicas corruption either read - // was successful or ChecksumException occured. - reportCheckSumFailure(corruptedBlockMap, - currentLocatedBlock.getLocations().length); - } - } - } - return -1; - } - - /** - * Read the entire buffer. - */ - @Override - public synchronized int read(final byte buf[], int off, int len) throws IOException { - ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf); - TraceScope scope = - dfsClient.getPathTraceScope("DFSInputStream#byteArrayRead", src); - try { - return readWithStrategy(byteArrayReader, off, len); - } finally { - scope.close(); - } - } - - @Override - public synchronized int read(final ByteBuffer buf) throws IOException { - ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf); - TraceScope scope = - dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src); - try { - return readWithStrategy(byteBufferReader, 0, buf.remaining()); - } finally { - scope.close(); - } - } - - - /** - * Add corrupted block replica into map. - */ - protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, - Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { - Set<DatanodeInfo> dnSet = null; - if((corruptedBlockMap.containsKey(blk))) { - dnSet = corruptedBlockMap.get(blk); - }else { - dnSet = new HashSet<DatanodeInfo>(); - } - if (!dnSet.contains(node)) { - dnSet.add(node); - corruptedBlockMap.put(blk, dnSet); - } - } - - private DNAddrPair chooseDataNode(LocatedBlock block, - Collection<DatanodeInfo> ignoredNodes) throws IOException { - while (true) { - DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes); - if (result != null) { - return result; - } else { - String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), - deadNodes, ignoredNodes); - String blockInfo = block.getBlock() + " file=" + src; - if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) { - String description = "Could not obtain block: " + blockInfo; - DFSClient.LOG.warn(description + errMsg - + ". Throwing a BlockMissingException"); - throw new BlockMissingException(src, description, - block.getStartOffset()); - } - - DatanodeInfo[] nodes = block.getLocations(); - if (nodes == null || nodes.length == 0) { - DFSClient.LOG.info("No node available for " + blockInfo); - } - DFSClient.LOG.info("Could not obtain " + block.getBlock() - + " from any node: " + errMsg - + ". Will get new block locations from namenode and retry..."); - try { - // Introducing a random factor to the wait time before another retry. - // The wait time is dependent on # of failures and a random factor. - // At the first time of getting a BlockMissingException, the wait time - // is a random number between 0..3000 ms. If the first retry - // still fails, we will wait 3000 ms grace period before the 2nd retry. - // Also at the second retry, the waiting window is expanded to 6000 ms - // alleviating the request rate from the server. Similarly the 3rd retry - // will wait 6000ms grace period before retry and the waiting window is - // expanded to 9000ms. - final int timeWindow = dfsClient.getConf().getTimeWindow(); - double waitTime = timeWindow * failures + // grace period for the last round of attempt - // expanding time window for each failure - timeWindow * (failures + 1) * - ThreadLocalRandom.current().nextDouble(); - DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); - Thread.sleep((long)waitTime); - } catch (InterruptedException iex) { - } - deadNodes.clear(); //2nd option is to remove only nodes[blockId] - openInfo(true); - block = refreshLocatedBlock(block); - failures++; - } - } - } - - /** - * Get the best node from which to stream the data. - * @param block LocatedBlock, containing nodes in priority order. - * @param ignoredNodes Do not choose nodes in this array (may be null) - * @return The DNAddrPair of the best node. Null if no node can be chosen. - */ - protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, - Collection<DatanodeInfo> ignoredNodes) { - DatanodeInfo[] nodes = block.getLocations(); - StorageType[] storageTypes = block.getStorageTypes(); - DatanodeInfo chosenNode = null; - StorageType storageType = null; - if (nodes != null) { - for (int i = 0; i < nodes.length; i++) { - if (!deadNodes.containsKey(nodes[i]) - && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { - chosenNode = nodes[i]; - // Storage types are ordered to correspond with nodes, so use the same - // index to get storage type. - if (storageTypes != null && i < storageTypes.length) { - storageType = storageTypes[i]; - } - break; - } - } - } - if (chosenNode == null) { - DFSClient.LOG.warn("No live nodes contain block " + block.getBlock() + - " after checking nodes = " + Arrays.toString(nodes) + - ", ignoredNodes = " + ignoredNodes); - return null; - } - final String dnAddr = - chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Connecting to datanode " + dnAddr); - } - InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); - return new DNAddrPair(chosenNode, targetAddr, storageType); - } - - private static String getBestNodeDNAddrPairErrorString( - DatanodeInfo nodes[], AbstractMap<DatanodeInfo, - DatanodeInfo> deadNodes, Collection<DatanodeInfo> ignoredNodes) { - StringBuilder errMsgr = new StringBuilder( - " No live nodes contain current block "); - errMsgr.append("Block locations:"); - for (DatanodeInfo datanode : nodes) { - errMsgr.append(" "); - errMsgr.append(datanode.toString()); - } - errMsgr.append(" Dead nodes: "); - for (DatanodeInfo datanode : deadNodes.keySet()) { - errMsgr.append(" "); - errMsgr.append(datanode.toString()); - } - if (ignoredNodes != null) { - errMsgr.append(" Ignored nodes: "); - for (DatanodeInfo datanode : ignoredNodes) { - errMsgr.append(" "); - errMsgr.append(datanode.toString()); - } - } - return errMsgr.toString(); - } - - protected void fetchBlockByteRange(LocatedBlock block, long start, long end, - byte[] buf, int offset, - Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) - throws IOException { - block = refreshLocatedBlock(block); - while (true) { - DNAddrPair addressPair = chooseDataNode(block, null); - try { - actualGetFromOneDataNode(addressPair, block, start, end, - buf, offset, corruptedBlockMap); - return; - } catch (IOException e) { - // Ignore. Already processed inside the function. - // Loop through to try the next node. - } - } - } - - private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode, - final LocatedBlock block, final long start, final long end, - final ByteBuffer bb, - final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, - final int hedgedReadId) { - final Span parentSpan = Trace.currentSpan(); - return new Callable<ByteBuffer>() { - @Override - public ByteBuffer call() throws Exception { - byte[] buf = bb.array(); - int offset = bb.position(); - TraceScope scope = - Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan); - try { - actualGetFromOneDataNode(datanode, block, start, end, buf, - offset, corruptedBlockMap); - return bb; - } finally { - scope.close(); - } - } - }; - } - - /** - * Used when reading contiguous blocks - */ - private void actualGetFromOneDataNode(final DNAddrPair datanode, - LocatedBlock block, final long start, final long end, byte[] buf, - int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) - throws IOException { - final int length = (int) (end - start + 1); - actualGetFromOneDataNode(datanode, block, start, end, buf, - new int[]{offset}, new int[]{length}, corruptedBlockMap); - } - - /** - * Read data from one DataNode. - * @param datanode the datanode from which to read data - * @param block the located block containing the requested data - * @param startInBlk the startInBlk offset of the block - * @param endInBlk the endInBlk offset of the block - * @param buf the given byte array into which the data is read - * @param offsets the data may be read into multiple segments of the buf - * (when reading a striped block). this array indicates the - * offset of each buf segment. - * @param lengths the length of each buf segment - * @param corruptedBlockMap map recording list of datanodes with corrupted - * block replica - */ - void actualGetFromOneDataNode(final DNAddrPair datanode, - LocatedBlock block, final long startInBlk, final long endInBlk, - byte[] buf, int[] offsets, int[] lengths, - Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) - throws IOException { - DFSClientFaultInjector.get().startFetchFromDatanode(); - int refetchToken = 1; // only need to get a new access token once - int refetchEncryptionKey = 1; // only need to get a new encryption key once - final int len = (int) (endInBlk - startInBlk + 1); - checkReadPortions(offsets, lengths, len); - - while (true) { - // cached block locations may have been updated by chooseDataNode() - // or fetchBlockAt(). Always get the latest list of locations at the - // start of the loop. - block = refreshLocatedBlock(block); - BlockReader reader = null; - try { - DFSClientFaultInjector.get().fetchFromDatanodeException(); - reader = getBlockReader(block, startInBlk, len, datanode.addr, - datanode.storageType, datanode.info); - for (int i = 0; i < offsets.length; i++) { - int nread = reader.readAll(buf, offsets[i], lengths[i]); - updateReadStatistics(readStatistics, nread, reader); - if (nread != lengths[i]) { - throw new IOException("truncated return from reader.read(): " + - "excpected " + lengths[i] + ", got " + nread); - } - } - DFSClientFaultInjector.get().readFromDatanodeDelay(); - return; - } catch (ChecksumException e) { - String msg = "fetchBlockByteRange(). Got a checksum exception for " - + src + " at " + block.getBlock() + ":" + e.getPos() + " from " - + datanode.info; - DFSClient.LOG.warn(msg); - // we want to remember what we have tried - addIntoCorruptedBlockMap(block.getBlock(), datanode.info, - corruptedBlockMap); - addToDeadNodes(datanode.info); - throw new IOException(msg); - } catch (IOException e) { - if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { - DFSClient.LOG.info("Will fetch a new encryption key and retry, " - + "encryption key was invalid when connecting to " + datanode.addr - + " : " + e); - // The encryption key used is invalid. - refetchEncryptionKey--; - dfsClient.clearDataEncryptionKey(); - } else if (refetchToken > 0 && tokenRefetchNeeded(e, datanode.addr)) { - refetchToken--; - try { - fetchBlockAt(block.getStartOffset()); - } catch (IOException fbae) { - // ignore IOE, since we can retry it later in a loop - } - } else { - String msg = "Failed to connect to " + datanode.addr + " for file " - + src + " for block " + block.getBlock() + ":" + e; - DFSClient.LOG.warn("Connection failure: " + msg, e); - addToDeadNodes(datanode.info); - throw new IOException(msg); - } - } finally { - if (reader != null) { - reader.close(); - } - } - } - } - - /** - * Refresh cached block locations. - * @param block The currently cached block locations - * @return Refreshed block locations - * @throws IOException - */ - protected LocatedBlock refreshLocatedBlock(LocatedBlock block) - throws IOException { - return getBlockAt(block.getStartOffset()); - } - - /** - * This method verifies that the read portions are valid and do not overlap - * with each other. - */ - private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) { - Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0); - int sum = 0; - for (int i = 0; i < lengths.length; i++) { - if (i > 0) { - int gap = offsets[i] - offsets[i - 1]; - // make sure read portions do not overlap with each other - Preconditions.checkArgument(gap >= lengths[i - 1]); - } - sum += lengths[i]; - } - Preconditions.checkArgument(sum == totalLen); - } - - /** - * Like {@link #fetchBlockByteRange}except we start up a second, parallel, - * 'hedged' read if the first read is taking longer than configured amount of - * time. We then wait on which ever read returns first. - */ - private void hedgedFetchBlockByteRange(LocatedBlock block, long start, - long end, byte[] buf, int offset, - Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) - throws IOException { - final DfsClientConf conf = dfsClient.getConf(); - ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>(); - CompletionService<ByteBuffer> hedgedService = - new ExecutorCompletionService<ByteBuffer>( - dfsClient.getHedgedReadsThreadPool()); - ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>(); - ByteBuffer bb = null; - int len = (int) (end - start + 1); - int hedgedReadId = 0; - block = refreshLocatedBlock(block); - while (true) { - // see HDFS-6591, this metric is used to verify/catch unnecessary loops - hedgedReadOpsLoopNumForTesting++; - DNAddrPair chosenNode = null; - // there is no request already executing. - if (futures.isEmpty()) { - // chooseDataNode is a commitment. If no node, we go to - // the NN to reget block locations. Only go here on first read. - chosenNode = chooseDataNode(block, ignored); - bb = ByteBuffer.wrap(buf, offset, len); - Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( - chosenNode, block, start, end, bb, - corruptedBlockMap, hedgedReadId++); - Future<ByteBuffer> firstRequest = hedgedService - .submit(getFromDataNodeCallable); - futures.add(firstRequest); - try { - Future<ByteBuffer> future = hedgedService.poll( - conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS); - if (future != null) { - future.get(); - return; - } - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Waited " + conf.getHedgedReadThresholdMillis() - + "ms to read from " + chosenNode.info - + "; spawning hedged read"); - } - // Ignore this node on next go around. - ignored.add(chosenNode.info); - dfsClient.getHedgedReadMetrics().incHedgedReadOps(); - continue; // no need to refresh block locations - } catch (InterruptedException e) { - // Ignore - } catch (ExecutionException e) { - // Ignore already logged in the call. - } - } else { - // We are starting up a 'hedged' read. We have a read already - // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. - // If no nodes to do hedged reads against, pass. - try { - chosenNode = getBestNodeDNAddrPair(block, ignored); - if (chosenNode == null) { - chosenNode = chooseDataNode(block, ignored); - } - bb = ByteBuffer.allocate(len); - Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( - chosenNode, block, start, end, bb, - corruptedBlockMap, hedgedReadId++); - Future<ByteBuffer> oneMoreRequest = hedgedService - .submit(getFromDataNodeCallable); - futures.add(oneMoreRequest); - } catch (IOException ioe) { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Failed getting node for hedged read: " - + ioe.getMessage()); - } - } - // if not succeeded. Submit callables for each datanode in a loop, wait - // for a fixed interval and get the result from the fastest one. - try { - ByteBuffer result = getFirstToComplete(hedgedService, futures); - // cancel the rest. - cancelAll(futures); - if (result.array() != buf) { // compare the array pointers - dfsClient.getHedgedReadMetrics().incHedgedReadWins(); - System.arraycopy(result.array(), result.position(), buf, offset, - len); - } else { - dfsClient.getHedgedReadMetrics().incHedgedReadOps(); - } - return; - } catch (InterruptedException ie) { - // Ignore and retry - } - // We got here if exception. Ignore this node on next go around IFF - // we found a chosenNode to hedge read against. - if (chosenNode != null && chosenNode.info != null) { - ignored.add(chosenNode.info); - } - } - } - } - - @VisibleForTesting - public long getHedgedReadOpsLoopNumForTesting() { - return hedgedReadOpsLoopNumForTesting; - } - - private ByteBuffer getFirstToComplete( - CompletionService<ByteBuffer> hedgedService, - ArrayList<Future<ByteBuffer>> futures) throws InterruptedException { - if (futures.isEmpty()) { - throw new InterruptedException("let's retry"); - } - Future<ByteBuffer> future = null; - try { - future = hedgedService.take(); - ByteBuffer bb = future.get(); - futures.remove(future); - return bb; - } catch (ExecutionException e) { - // already logged in the Callable - futures.remove(future); - } catch (CancellationException ce) { - // already logged in the Callable - futures.remove(future); - } - - throw new InterruptedException("let's retry"); - } - - private void cancelAll(List<Future<ByteBuffer>> futures) { - for (Future<ByteBuffer> future : futures) { - // Unfortunately, hdfs reads do not take kindly to interruption. - // Threads return a variety of interrupted-type exceptions but - // also complaints about invalid pbs -- likely because read - // is interrupted before gets whole pb. Also verbose WARN - // logging. So, for now, do not interrupt running read. - future.cancel(false); - } - } - - /** - * Should the block access token be refetched on an exception - * - * @param ex Exception received - * @param targetAddr Target datanode address from where exception was received - * @return true if block access token has expired or invalid and it should be - * refetched - */ - protected static boolean tokenRefetchNeeded(IOException ex, - InetSocketAddress targetAddr) { - /* - * Get a new access token and retry. Retry is needed in 2 cases. 1) - * When both NN and DN re-started while DFSClient holding a cached - * access token. 2) In the case that NN fails to update its - * access key at pre-set interval (by a wide margin) and - * subsequently restarts. In this case, DN re-registers itself with - * NN and receives a new access key, but DN will delete the old - * access key from its memory since it's considered expired based on - * the estimated expiration date. - */ - if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) { - DFSClient.LOG.info("Access token was invalid when connecting to " - + targetAddr + " : " + ex); - return true; - } - return false; - } - - /** - * Read bytes starting from the specified position. - * - * @param position start read from this position - * @param buffer read buffer - * @param offset offset into buffer - * @param length number of bytes to read - * - * @return actual number of bytes read - */ - @Override - public int read(long position, byte[] buffer, int offset, int length) - throws IOException { - TraceScope scope = - dfsClient.getPathTraceScope("DFSInputStream#byteArrayPread", src); - try { - return pread(position, buffer, offset, length); - } finally { - scope.close(); - } - } - - private int pread(long position, byte[] buffer, int offset, int length) - throws IOException { - // sanity checks - dfsClient.checkOpen(); - if (closed.get()) { - throw new IOException("Stream closed"); - } - failures = 0; - long filelen = getFileLength(); - if ((position < 0) || (position >= filelen)) { - return -1; - } - int realLen = length; - if ((position + length) > filelen) { - realLen = (int)(filelen - position); - } - - // determine the block and byte range within the block - // corresponding to position and realLen - List<LocatedBlock> blockRange = getBlockRange(position, realLen); - int remaining = realLen; - Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap - = new HashMap<ExtendedBlock, Set<DatanodeInfo>>(); - for (LocatedBlock blk : blockRange) { - long targetStart = position - blk.getStartOffset(); - long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); - try { - if (dfsClient.isHedgedReadsEnabled()) { - hedgedFetchBlockByteRange(blk, targetStart, - targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); - } else { - fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, - buffer, offset, corruptedBlockMap); - } - } finally { - // Check and report if any block replicas are corrupted. - // BlockMissingException may be caught if all block replicas are - // corrupted. - reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length); - } - - remaining -= bytesToRead; - position += bytesToRead; - offset += bytesToRead; - } - assert remaining == 0 : "Wrong number of bytes read."; - if (dfsClient.stats != null) { - dfsClient.stats.incrementBytesRead(realLen); - } - return realLen; - } - - /** - * DFSInputStream reports checksum failure. - * Case I : client has tried multiple data nodes and at least one of the - * attempts has succeeded. We report the other failures as corrupted block to - * namenode. - * Case II: client has tried out all data nodes, but all failed. We - * only report if the total number of replica is 1. We do not - * report otherwise since this maybe due to the client is a handicapped client - * (who can not read). - * @param corruptedBlockMap map of corrupted blocks - * @param dataNodeCount number of data nodes who contains the block replicas - */ - protected void reportCheckSumFailure( - Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, - int dataNodeCount) { - if (corruptedBlockMap.isEmpty()) { - return; - } - Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap - .entrySet().iterator(); - Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next(); - ExtendedBlock blk = entry.getKey(); - Set<DatanodeInfo> dnSet = entry.getValue(); - if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0)) - || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) { - DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()]; - int i = 0; - for (DatanodeInfo dn:dnSet) { - locs[i++] = dn; - } - LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) }; - dfsClient.reportChecksumFailure(src, lblocks); - } - corruptedBlockMap.clear(); - } - - @Override - public long skip(long n) throws IOException { - if ( n > 0 ) { - long curPos = getPos(); - long fileLen = getFileLength(); - if( n+curPos > fileLen ) { - n = fileLen - curPos; - } - seek(curPos+n); - return n; - } - return n < 0 ? -1 : 0; - } - - /** - * Seek to a new arbitrary location - */ - @Override - public synchronized void seek(long targetPos) throws IOException { - if (targetPos > getFileLength()) { - throw new EOFException("Cannot seek after EOF"); - } - if (targetPos < 0) { - throw new EOFException("Cannot seek to negative offset"); - } - if (closed.get()) { - throw new IOException("Stream is closed!"); - } - boolean done = false; - if (pos <= targetPos && targetPos <= blockEnd) { - // - // If this seek is to a positive position in the current - // block, and this piece of data might already be lying in - // the TCP buffer, then just eat up the intervening data. - // - int diff = (int)(targetPos - pos); - if (diff <= blockReader.available()) { - try { - pos += blockReader.skip(diff); - if (pos == targetPos) { - done = true; - } else { - // The range was already checked. If the block reader returns - // something unexpected instead of throwing an exception, it is - // most likely a bug. - String errMsg = "BlockReader failed to seek to " + - targetPos + ". Instead, it seeked to " + pos + "."; - DFSClient.LOG.warn(errMsg); - throw new IOException(errMsg); - } - } catch (IOException e) {//make following read to retry - if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Exception while seek to " + targetPos - + " from " + getCurrentBlock() + " of " + src + " from " - + currentNode, e); - } - } - } - } - if (!done) { - pos = targetPos; - blockEnd = -1; - } - } - - /** - * Same as {@link #seekToNewSource(long)} except that it does not exclude - * the current datanode and might connect to the same node. - */ - private boolean seekToBlockSource(long targetPos) - throws IOException { - currentNode = blockSeekTo(targetPos); - return true; - } - - /** - * Seek to given position on a node other than the current node. If - * a node other than the current node is found, then returns true. - * If another node could not be found, then returns false. - */ - @Override - public synchronized boolean seekToNewSource(long targetPos) throws IOException { - if (currentNode == null) { - return seekToBlockSource(targetPos); - } - boolean markedDead = deadNodes.containsKey(currentNode); - addToDeadNodes(currentNode); - DatanodeInfo oldNode = currentNode; - DatanodeInfo newNode = blockSeekTo(targetPos); - if (!markedDead) { - /* remove it from deadNodes. blockSeekTo could have cleared - * deadNodes and added currentNode again. Thats ok. */ - deadNodes.remove(oldNode); - } - if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) { - currentNode = newNode; - return true; - } else { - return false; - } - } - - /** - */ - @Override - public synchronized long getPos() { - return pos; - } - - /** Return the size of the remaining available bytes - * if the size is less than or equal to {@link Integer#MAX_VALUE}, - * otherwise, return {@link Integer#MAX_VALUE}. - */ - @Override - public synchronized int available() throws IOException { - if (closed.get()) { - throw new IOException("Stream closed"); - } - - final long remaining = getFileLength() - pos; - return remaining <= Integer.MAX_VALUE? (int)remaining: Integer.MAX_VALUE; - } - - /** - * We definitely don't support marks - */ - @Override - public boolean markSupported() { - return false; - } - @Override - public void mark(int readLimit) { - } - @Override - public void reset() throws IOException { - throw new IOException("Mark/reset not supported"); - } - - /** Utility class to encapsulate data node info and its address. */ - static final class DNAddrPair { - final DatanodeInfo info; - final InetSocketAddress addr; - final StorageType storageType; - - DNAddrPair(DatanodeInfo info, InetSocketAddress addr, - StorageType storageType) { - this.info = info; - this.addr = addr; - this.storageType = storageType; - } - } - - /** - * Get statistics about the reads which this DFSInputStream has done. - */ - public ReadStatistics getReadStatistics() { - synchronized(infoLock) { - return new ReadStatistics(readStatistics); - } - } - - /** - * Clear statistics about the reads which this DFSInputStream has done. - */ - public void clearReadStatistics() { - synchronized(infoLock) { - readStatistics.clear(); - } - } - - public FileEncryptionInfo getFileEncryptionInfo() { - synchronized(infoLock) { - return fileEncryptionInfo; - } - } - - protected void closeCurrentBlockReaders() { - if (blockReader == null) return; - // Close the current block reader so that the new caching settings can - // take effect immediately. - try { - blockReader.close(); - } catch (IOException e) { - DFSClient.LOG.error("error closing blockReader", e); - } - blockReader = null; - blockEnd = -1; - } - - @Override - public synchronized void setReadahead(Long readahead) - throws IOException { - synchronized (infoLock) { - this.cachingStrategy = - new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build(); - } - closeCurrentBlockReaders(); - } - - @Override - public synchronized void setDropBehind(Boolean dropBehind) - throws IOException { - synchronized (infoLock) { - this.cachingStrategy = - new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build(); - } - closeCurrentBlockReaders(); - } - - /** - * The immutable empty buffer we return when we reach EOF when doing a - * zero-copy read. - */ - private static final ByteBuffer EMPTY_BUFFER = - ByteBuffer.allocateDirect(0).asReadOnlyBuffer(); - - @Override - public synchronized ByteBuffer read(ByteBufferPool bufferPool, - int maxLength, EnumSet<ReadOption> opts) - throws IOException, UnsupportedOperationException { - if (maxLength == 0) { - return EMPTY_BUFFER; - } else if (maxLength < 0) { - throw new IllegalArgumentException("can't read a negative " + - "number of bytes."); - } - if ((blockReader == null) || (blockEnd == -1)) { - if (pos >= getFileLength()) { - return null; - } - /* - * If we don't have a blockReader, or the one we have has no more bytes - * left to read, we call seekToBlockSource to get a new blockReader and - * recalculate blockEnd. Note that we assume we're not at EOF here - * (we check this above). - */ - if ((!seekToBlockSource(pos)) || (blockReader == null)) { - throw new IOException("failed to allocate new BlockReader " + - "at position " + pos); - } - } - ByteBuffer buffer = null; - if (dfsClient.getConf().getShortCircuitConf().isShortCircuitMmapEnabled()) { - buffer = tryReadZeroCopy(maxLength, opts); - } - if (buffer != null) { - return buffer; - } - buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength); - if (buffer != null) { - getExtendedReadBuffers().put(buffer, bufferPool); - } - return buffer; - } - - private synchronized ByteBuffer tryReadZeroCopy(int maxLength, - EnumSet<ReadOption> opts) throws IOException { - // Copy 'pos' and 'blockEnd' to local variables to make it easier for the - // JVM to optimize this function. - final long curPos = pos; - final long curEnd = blockEnd; - final long blockStartInFile = currentLocatedBlock.getStartOffset(); - final long blockPos = curPos - blockStartInFile; - - // Shorten this read if the end of the block is nearby. - long length63; - if ((curPos + maxLength) <= (curEnd + 1)) { - length63 = maxLength; - } else { - length63 = 1 + curEnd - curPos; - if (length63 <= 0) { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " + - curPos + " of " + src + "; " + length63 + " bytes left in block. " + - "blockPos=" + blockPos + "; curPos=" + curPos + - "; curEnd=" + curEnd); - } - return null; - } - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Reducing read length from " + maxLength + - " to " + length63 + " to avoid going more than one byte " + - "past the end of the block. blockPos=" + blockPos + - "; curPos=" + curPos + "; curEnd=" + curEnd); - } - } - // Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer. - int length; - if (blockPos + length63 <= Integer.MAX_VALUE) { - length = (int)length63; - } else { - long length31 = Integer.MAX_VALUE - blockPos; - if (length31 <= 0) { - // Java ByteBuffers can't be longer than 2 GB, because they use - // 4-byte signed integers to represent capacity, etc. - // So we can't mmap the parts of the block higher than the 2 GB offset. - // FIXME: we could work around this with multiple memory maps. - // See HDFS-5101. - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " + - curPos + " of " + src + "; 31-bit MappedByteBuffer limit " + - "exceeded. blockPos=" + blockPos + ", curEnd=" + curEnd); - } - return null; - } - length = (int)length31; - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Reducing read length from " + maxLength + - " to " + length + " to avoid 31-bit limit. " + - "blockPos=" + blockPos + "; curPos=" + curPos + - "; curEnd=" + curEnd); - } - } - final ClientMmap clientMmap = blockReader.getClientMmap(opts); - if (clientMmap == null) { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("unable to perform a zero-copy read from offset " + - curPos + " of " + src + "; BlockReader#getClientMmap returned " + - "null."); - } - return null; - } - boolean success = false; - ByteBuffer buffer; - try { - seek(curPos + length); - buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer(); - buffer.position((int)blockPos); - buffer.limit((int)(blockPos + length)); - getExtendedReadBuffers().put(buffer, clientMmap); - synchronized (infoLock) { - readStatistics.addZeroCopyBytes(length); - } - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("readZeroCopy read " + length + - " bytes from offset " + curPos + " via the zero-copy read " + - "path. blockEnd = " + blockEnd); - } - success = true; - } finally { - if (!success) { - IOUtils.closeQuietly(clientMmap); - } - } - return buffer; - } - - @Override - public synchronized void releaseBuffer(ByteBuffer buffer) { - if (buffer == EMPTY_BUFFER) return; - Object val = getExtendedReadBuffers().remove(buffer); - if (val == null) { - throw new IllegalArgumentException("tried to release a buffer " + - "that was not created by this stream, " + buffer); - } - if (val instanceof ClientMmap) { - IOUtils.closeQuietly((ClientMmap)val); - } else if (val instanceof ByteBufferPool) { - ((ByteBufferPool)val).putBuffer(buffer); - } - } - - @Override - public synchronized void unbuffer() { - closeCurrentBlockReaders(); - } -}
