http://git-wip-us.apache.org/repos/asf/hadoop/blob/7136e8c5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index 12496e2..4eb4c52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; /** * Used for injecting faults in DFSClient and DFSOutputStream tests. - * Calls into this are a no-op in production code. + * Calls into this are a no-op in production code. */ @VisibleForTesting @InterfaceAudience.Private
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7136e8c5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java index c98cd5f..45bea5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java @@ -69,8 +69,8 @@ public class DFSInotifyEventInputStream { this(namenode, tracer, namenode.getCurrentEditLogTxid()); } - DFSInotifyEventInputStream(ClientProtocol namenode, - Tracer tracer, long lastReadTxid) throws IOException { + DFSInotifyEventInputStream(ClientProtocol namenode, Tracer tracer, + long lastReadTxid) { this.namenode = namenode; this.it = Iterators.emptyIterator(); this.lastReadTxid = lastReadTxid; @@ -94,8 +94,7 @@ public class DFSInotifyEventInputStream { * The next available batch of events will be returned. */ public EventBatch poll() throws IOException, MissingEventsException { - TraceScope scope = tracer.newScope("inotifyPoll"); - try { + try (TraceScope ignored = tracer.newScope("inotifyPoll")) { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); @@ -119,7 +118,7 @@ public class DFSInotifyEventInputStream { } } else { LOG.debug("poll(): read no edits from the NN when requesting edits " + - "after txid {}", lastReadTxid); + "after txid {}", lastReadTxid); return null; } } @@ -130,8 +129,6 @@ public class DFSInotifyEventInputStream { } else { return null; } - } finally { - scope.close(); } } @@ -175,9 +172,8 @@ public class DFSInotifyEventInputStream { */ public EventBatch poll(long time, TimeUnit tu) throws IOException, InterruptedException, MissingEventsException { - TraceScope scope = tracer.newScope("inotifyPollWithTimeout"); - EventBatch next = null; - try { + EventBatch next; + try (TraceScope ignored = tracer.newScope("inotifyPollWithTimeout")) { long initialTime = Time.monotonicNow(); long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); long nextWait = INITIAL_WAIT_MS; @@ -195,8 +191,6 @@ public class DFSInotifyEventInputStream { nextWait); Thread.sleep(nextWait); } - } finally { - scope.close(); } return next; } @@ -212,9 +206,8 @@ public class DFSInotifyEventInputStream { */ public EventBatch take() throws IOException, InterruptedException, MissingEventsException { - TraceScope scope = tracer.newScope("inotifyTake"); - EventBatch next = null; - try { + EventBatch next; + try (TraceScope ignored = tracer.newScope("inotifyTake")) { int nextWaitMin = INITIAL_WAIT_MS; while ((next = poll()) == null) { // sleep for a random period between nextWaitMin and nextWaitMin * 2 @@ -225,8 +218,6 @@ public class DFSInotifyEventInputStream { // the maximum sleep is 2 minutes nextWaitMin = Math.min(60000, nextWaitMin * 2); } - } finally { - scope.close(); } return next; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7136e8c5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index ab5faae..6823c1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -54,11 +54,9 @@ import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileEncryptionInfo; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -84,13 +82,15 @@ import org.apache.htrace.core.Tracer; import com.google.common.annotations.VisibleForTesting; +import javax.annotation.Nonnull; + /**************************************************************** - * DFSInputStream provides bytes from a named file. It handles + * DFSInputStream provides bytes from a named file. It handles * negotiation of the namenode and various datanodes as necessary. ****************************************************************/ @InterfaceAudience.Private public class DFSInputStream extends FSInputStream -implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, + implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer { @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; @@ -127,7 +127,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, /** * Track the ByteBuffers that we have handed out to readers. - * + * * The value type can be either ByteBufferPool or ClientMmap, depending on * whether we this is a memory-mapped buffer or not. */ @@ -136,7 +136,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, private synchronized IdentityHashStore<ByteBuffer, Object> getExtendedReadBuffers() { if (extendedReadBuffers == null) { - extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0); + extendedReadBuffers = new IdentityHashStore<>(0); } return extendedReadBuffers; } @@ -176,7 +176,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, public long getTotalShortCircuitBytesRead() { return totalShortCircuitBytesRead; } - + /** * @return The total number of zero-copy bytes read. */ @@ -190,7 +190,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, public long getRemoteBytesRead() { return totalBytesRead - totalLocalBytesRead; } - + void addRemoteBytes(long amt) { this.totalBytesRead += amt; } @@ -219,7 +219,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, this.totalShortCircuitBytesRead = 0; this.totalZeroCopyBytesRead = 0; } - + private long totalBytesRead; private long totalLocalBytesRead; @@ -228,7 +228,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, private long totalZeroCopyBytesRead; } - + /** * This variable tracks the number of failures since the start of the * most recent user-facing operation. That is to say, it should be reset @@ -242,19 +242,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, */ protected int failures = 0; - /* XXX Use of CocurrentHashMap is temp fix. Need to fix + /* XXX Use of CocurrentHashMap is temp fix. Need to fix * parallel accesses to DFSInputStream (through ptreads) properly */ private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = - new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>(); + new ConcurrentHashMap<>(); private byte[] oneByteBuf; // used for 'int read()' void addToDeadNodes(DatanodeInfo dnInfo) { deadNodes.put(dnInfo, dnInfo); } - + DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, - LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException { + LocatedBlocks locatedBlocks) throws IOException { this.dfsClient = dfsClient; this.verifyChecksum = verifyChecksum; this.src = src; @@ -269,8 +269,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * Grab the open-file info from namenode * @param refreshLocatedBlocks whether to re-fetch locatedblocks */ - void openInfo(boolean refreshLocatedBlocks) throws IOException, - UnresolvedLinkException { + void openInfo(boolean refreshLocatedBlocks) throws IOException { final DfsClientConf conf = dfsClient.getConf(); synchronized(infoLock) { lastBlockBeingWrittenLength = @@ -343,7 +342,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } final long len = readBlockLength(last); last.getBlock().setNumBytes(len); - lastBlockBeingWrittenLength = len; + lastBlockBeingWrittenLength = len; } } @@ -356,30 +355,30 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, private long readBlockLength(LocatedBlock locatedblock) throws IOException { assert locatedblock != null : "LocatedBlock cannot be null"; int replicaNotFoundCount = locatedblock.getLocations().length; - + final DfsClientConf conf = dfsClient.getConf(); for(DatanodeInfo datanode : locatedblock.getLocations()) { ClientDatanodeProtocol cdp = null; - + try { cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode, dfsClient.getConfiguration(), conf.getSocketTimeout(), conf.isConnectToDnViaHostname(), locatedblock); - + final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); - + if (n >= 0) { return n; } } catch(IOException ioe) { if (ioe instanceof RemoteException && - (((RemoteException) ioe).unwrapRemoteException() instanceof - ReplicaNotFoundException)) { + (((RemoteException) ioe).unwrapRemoteException() instanceof + ReplicaNotFoundException)) { // special case : replica might not be on the DN, treat as 0 length replicaNotFoundCount--; } - + DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}" + " for block {}", datanode, locatedblock.getBlock(), ioe); } finally { @@ -399,7 +398,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, throw new IOException("Cannot obtain block length for " + locatedblock); } - + public long getFileLength() { synchronized(infoLock) { return locatedBlocks == null? 0: @@ -423,7 +422,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } /** - * Returns the block containing the target position. + * Returns the block containing the target position. */ synchronized public ExtendedBlock getCurrentBlock() { if (currentLocatedBlock == null){ @@ -442,7 +441,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, /** * Get block at the specified position. * Fetch it from the namenode if not cached. - * + * * @param offset block corresponding to this offset in file is returned * @return located block * @throws IOException @@ -525,12 +524,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, blocks = getFinalizedBlockRange(offset, Math.min(length, lengthOfCompleteBlk - offset)); } else { - blocks = new ArrayList<LocatedBlock>(1); + blocks = new ArrayList<>(1); } // get the blocks from incomplete block range if (readLengthPastCompleteBlk) { - blocks.add(locatedBlocks.getLastLocatedBlock()); + blocks.add(locatedBlocks.getLastLocatedBlock()); } return blocks; @@ -546,7 +545,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, long offset, long length) throws IOException { synchronized(infoLock) { assert (locatedBlocks != null) : "locatedBlocks is null"; - List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>(); + List<LocatedBlock> blockRange = new ArrayList<>(); // search cached blocks first int blockIdx = locatedBlocks.findBlock(offset); if (blockIdx < 0) { // block is not cached @@ -590,10 +589,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, // // Connect to best DataNode for desired Block, with potential offset // - DatanodeInfo chosenNode = null; + DatanodeInfo chosenNode; int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once - + boolean connectFailedOnce = false; while (true) { @@ -638,7 +637,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } else { connectFailedOnce = true; DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" - + ", add to deadNodes and continue. " + ex, ex); + + ", add to deadNodes and continue. " + ex, ex); // Put chosen node into dead list, continue addToDeadNodes(chosenNode); } @@ -721,8 +720,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * strategy-agnostic. */ interface ReaderStrategy { - public int doRead(BlockReader blockReader, int off, int len) - throws ChecksumException, IOException; + int doRead(BlockReader blockReader, int off, int len) + throws IOException; /** * Copy data from the src ByteBuffer into the read buffer. @@ -732,7 +731,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * @param length Useful only when the ReadStrategy is based on a byte array. * Indicate the length of the data to copy. */ - public int copyFrom(ByteBuffer src, int offset, int length); + int copyFrom(ByteBuffer src, int offset, int length); } protected void updateReadStatistics(ReadStatistics readStatistics, @@ -748,7 +747,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } } } - + /** * Used to read bytes into a byte[] */ @@ -761,7 +760,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, @Override public int doRead(BlockReader blockReader, int off, int len) - throws ChecksumException, IOException { + throws IOException { int nRead = blockReader.read(buf, off, len); updateReadStatistics(readStatistics, nRead, blockReader); return nRead; @@ -786,7 +785,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, @Override public int doRead(BlockReader blockReader, int off, int len) - throws ChecksumException, IOException { + throws IOException { int oldpos = buf.position(); int oldlimit = buf.limit(); boolean success = false; @@ -804,7 +803,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, buf.position(oldpos); buf.limit(oldlimit); } - } + } } @Override @@ -820,12 +819,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, /* This is a used by regular read() and handles ChecksumExceptions. * name readBuffer() is chosen to imply similarity to readBuffer() in * ChecksumFileSystem - */ + */ private synchronized int readBuffer(ReaderStrategy reader, int off, int len, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException { IOException ioe; - + /* we retry current node only once. So this is set to true only here. * Intention is to handle one common case of an error that is not a * failure on datanode or client : when DataNode closes the connection @@ -841,7 +840,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } catch ( ChecksumException ce ) { DFSClient.LOG.warn("Found Checksum error for " + getCurrentBlock() + " from " + currentNode - + " at " + ce.getPos()); + + " at " + ce.getPos()); ioe = ce; retryCurrentNode = false; // we want to remember which block replicas we have tried @@ -855,12 +854,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } ioe = e; } - boolean sourceFound = false; + boolean sourceFound; if (retryCurrentNode) { /* possibly retry the same node so that transient errors don't * result in application level failures (e.g. Datanode could have * closed the connection because the client is idle for too long). - */ + */ sourceFound = seekToBlockSource(pos); } else { addToDeadNodes(currentNode); @@ -878,8 +877,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, if (closed.get()) { throw new IOException("Stream closed"); } - Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap - = new HashMap<ExtendedBlock, Set<DatanodeInfo>>(); + Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>(); failures = 0; if (pos < getFileLength()) { int retries = 2; @@ -898,7 +896,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } } int result = readBuffer(strategy, off, realLen, corruptedBlockMap); - + if (result >= 0) { pos += result; } else { @@ -910,7 +908,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } return result; } catch (ChecksumException ce) { - throw ce; + throw ce; } catch (IOException e) { if (retries == 1) { DFSClient.LOG.warn("DFS Read", e); @@ -923,7 +921,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } finally { // Check if need to report block replicas corruption either read // was successful or ChecksumException occured. - reportCheckSumFailure(corruptedBlockMap, + reportCheckSumFailure(corruptedBlockMap, currentLocatedBlock.getLocations().length); } } @@ -935,26 +933,21 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * Read the entire buffer. */ @Override - public synchronized int read(final byte buf[], int off, int len) throws IOException { + public synchronized int read(@Nonnull final byte buf[], int off, int len) + throws IOException { ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf); - TraceScope scope = - dfsClient.newPathTraceScope("DFSInputStream#byteArrayRead", src); - try { + try (TraceScope ignored = + dfsClient.newPathTraceScope("DFSInputStream#byteArrayRead", src)) { return readWithStrategy(byteArrayReader, off, len); - } finally { - scope.close(); } } @Override public synchronized int read(final ByteBuffer buf) throws IOException { ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf); - TraceScope scope = - dfsClient.newPathTraceScope("DFSInputStream#byteBufferRead", src); - try { + try (TraceScope ignored = + dfsClient.newPathTraceScope("DFSInputStream#byteBufferRead", src)){ return readWithStrategy(byteBufferReader, 0, buf.remaining()); - } finally { - scope.close(); } } @@ -964,11 +957,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, */ protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { - Set<DatanodeInfo> dnSet = null; + Set<DatanodeInfo> dnSet; if((corruptedBlockMap.containsKey(blk))) { dnSet = corruptedBlockMap.get(blk); }else { - dnSet = new HashSet<DatanodeInfo>(); + dnSet = new HashSet<>(); } if (!dnSet.contains(node)) { dnSet.add(node); @@ -984,7 +977,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, return result; } else { String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), - deadNodes, ignoredNodes); + deadNodes, ignoredNodes); String blockInfo = block.getBlock() + " file=" + src; if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) { String description = "Could not obtain block: " + blockInfo; @@ -1010,7 +1003,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, // Also at the second retry, the waiting window is expanded to 6000 ms // alleviating the request rate from the server. Similarly the 3rd retry // will wait 6000ms grace period before retry and the waiting window is - // expanded to 9000ms. + // expanded to 9000ms. final int timeWindow = dfsClient.getConf().getTimeWindow(); double waitTime = timeWindow * failures + // grace period for the last round of attempt // expanding time window for each failure @@ -1018,7 +1011,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, ThreadLocalRandom.current().nextDouble(); DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); Thread.sleep((long)waitTime); - } catch (InterruptedException iex) { + } catch (InterruptedException ignored) { } deadNodes.clear(); //2nd option is to remove only nodes[blockId] openInfo(true); @@ -1130,14 +1123,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, public ByteBuffer call() throws Exception { byte[] buf = bb.array(); int offset = bb.position(); - TraceScope scope = dfsClient.getTracer(). - newScope("hedgedRead" + hedgedReadId, parentSpanId); - try { + try (TraceScope ignored = dfsClient.getTracer(). + newScope("hedgedRead" + hedgedReadId, parentSpanId)) { actualGetFromOneDataNode(datanode, block, start, end, buf, offset, corruptedBlockMap); return bb; - } finally { - scope.close(); } } }; @@ -1243,12 +1233,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException { final DfsClientConf conf = dfsClient.getConf(); - ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>(); + ArrayList<Future<ByteBuffer>> futures = new ArrayList<>(); CompletionService<ByteBuffer> hedgedService = - new ExecutorCompletionService<ByteBuffer>( - dfsClient.getHedgedReadsThreadPool()); - ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>(); - ByteBuffer bb = null; + new ExecutorCompletionService<>(dfsClient.getHedgedReadsThreadPool()); + ArrayList<DatanodeInfo> ignored = new ArrayList<>(); + ByteBuffer bb; int len = (int) (end - start + 1); int hedgedReadId = 0; block = refreshLocatedBlock(block); @@ -1280,11 +1269,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, // Ignore this node on next go around. ignored.add(chosenNode.info); dfsClient.getHedgedReadMetrics().incHedgedReadOps(); - continue; // no need to refresh block locations - } catch (InterruptedException e) { + // continue; no need to refresh block locations + } catch (InterruptedException | ExecutionException e) { // Ignore - } catch (ExecutionException e) { - // Ignore already logged in the call. } } else { // We are starting up a 'hedged' read. We have a read already @@ -1349,10 +1336,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, ByteBuffer bb = future.get(); futures.remove(future); return bb; - } catch (ExecutionException e) { - // already logged in the Callable - futures.remove(future); - } catch (CancellationException ce) { + } catch (ExecutionException | CancellationException e) { // already logged in the Callable futures.remove(future); } @@ -1373,7 +1357,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, /** * Should the block access token be refetched on an exception - * + * * @param ex Exception received * @param targetAddr Target datanode address from where exception was received * @return true if block access token has expired or invalid and it should be @@ -1401,23 +1385,20 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, /** * Read bytes starting from the specified position. - * + * * @param position start read from this position * @param buffer read buffer * @param offset offset into buffer * @param length number of bytes to read - * + * * @return actual number of bytes read */ @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { - TraceScope scope = dfsClient. - newPathTraceScope("DFSInputStream#byteArrayPread", src); - try { + try (TraceScope ignored = dfsClient. + newPathTraceScope("DFSInputStream#byteArrayPread", src)) { return pread(position, buffer, offset, length); - } finally { - scope.close(); } } @@ -1437,13 +1418,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, if ((position + length) > filelen) { realLen = (int)(filelen - position); } - + // determine the block and byte range within the block // corresponding to position and realLen List<LocatedBlock> blockRange = getBlockRange(position, realLen); int remaining = realLen; - Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap - = new HashMap<ExtendedBlock, Set<DatanodeInfo>>(); + Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>(); for (LocatedBlock blk : blockRange) { long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); @@ -1472,12 +1452,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } return realLen; } - + /** * DFSInputStream reports checksum failure. * Case I : client has tried multiple data nodes and at least one of the * attempts has succeeded. We report the other failures as corrupted block to - * namenode. + * namenode. * Case II: client has tried out all data nodes, but all failed. We * only report if the total number of replica is 1. We do not * report otherwise since this maybe due to the client is a handicapped client @@ -1486,7 +1466,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * @param dataNodeCount number of data nodes who contains the block replicas */ protected void reportCheckSumFailure( - Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, + Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, int dataNodeCount) { if (corruptedBlockMap.isEmpty()) { return; @@ -1553,8 +1533,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } else { // The range was already checked. If the block reader returns // something unexpected instead of throwing an exception, it is - // most likely a bug. - String errMsg = "BlockReader failed to seek to " + + // most likely a bug. + String errMsg = "BlockReader failed to seek to " + targetPos + ". Instead, it seeked to " + pos + "."; DFSClient.LOG.warn(errMsg); throw new IOException(errMsg); @@ -1580,10 +1560,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, currentNode = blockSeekTo(targetPos); return true; } - + /** * Seek to given position on a node other than the current node. If - * a node other than the current node is found, then returns true. + * a node other than the current node is found, then returns true. * If another node could not be found, then returns false. */ @Override @@ -1596,7 +1576,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, DatanodeInfo oldNode = currentNode; DatanodeInfo newNode = blockSeekTo(targetPos); if (!markedDead) { - /* remove it from deadNodes. blockSeekTo could have cleared + /* remove it from deadNodes. blockSeekTo could have cleared * deadNodes and added currentNode again. Thats ok. */ deadNodes.remove(oldNode); } @@ -1607,7 +1587,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, return false; } } - + /** */ @Override @@ -1684,7 +1664,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, protected void closeCurrentBlockReaders() { if (blockReader == null) return; - // Close the current block reader so that the new caching settings can + // Close the current block reader so that the new caching settings can // take effect immediately. try { blockReader.close(); @@ -1720,11 +1700,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * zero-copy read. */ private static final ByteBuffer EMPTY_BUFFER = - ByteBuffer.allocateDirect(0).asReadOnlyBuffer(); + ByteBuffer.allocateDirect(0).asReadOnlyBuffer(); @Override public synchronized ByteBuffer read(ByteBufferPool bufferPool, - int maxLength, EnumSet<ReadOption> opts) + int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { if (maxLength == 0) { return EMPTY_BUFFER; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7136e8c5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 6039177..ed8c41d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSOutputSummer; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileEncryptionInfo; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.permission.FsPermission; @@ -91,7 +90,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable, CanSetDropBehind { static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class); /** - * Number of times to retry creating a file when there are transient + * Number of times to retry creating a file when there are transient * errors (typically related to encryption zones and KeyProvider operations). */ @VisibleForTesting @@ -122,8 +121,9 @@ public class DFSOutputStream extends FSOutputSummer private FileEncryptionInfo fileEncryptionInfo; /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ - protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, - long seqno, boolean lastPacketInBlock) throws InterruptedIOException { + protected DFSPacket createPacket(int packetSize, int chunksPerPkt, + long offsetInBlock, long seqno, boolean lastPacketInBlock) + throws InterruptedIOException { final byte[] buf; final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize; @@ -160,9 +160,7 @@ public class DFSOutputStream extends FSOutputSummer return null; } DatanodeInfo[] value = new DatanodeInfo[currentNodes.length]; - for (int i = 0; i < currentNodes.length; i++) { - value[i] = currentNodes[i]; - } + System.arraycopy(currentNodes, 0, value, 0, currentNodes.length); return value; } @@ -180,8 +178,8 @@ public class DFSOutputStream extends FSOutputSummer return checksum; } - private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, - HdfsFileStatus stat, DataChecksum checksum) throws IOException { + private DFSOutputStream(DFSClient dfsClient, String src, + Progressable progress, HdfsFileStatus stat, DataChecksum checksum) { super(getChecksum4Compute(checksum, stat)); this.dfsClient = dfsClient; this.src = src; @@ -189,7 +187,7 @@ public class DFSOutputStream extends FSOutputSummer this.blockSize = stat.getBlockSize(); this.blockReplication = stat.getReplication(); this.fileEncryptionInfo = stat.getFileEncryptionInfo(); - this.cachingStrategy = new AtomicReference<CachingStrategy>( + this.cachingStrategy = new AtomicReference<>( dfsClient.getDefaultWriteCachingStrategy()); if (progress != null) { DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream " @@ -203,21 +201,22 @@ public class DFSOutputStream extends FSOutputSummer } if (blockSize % bytesPerChecksum != 0) { throw new HadoopIllegalArgumentException("Invalid values: " - + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum - + ") must divide block size (=" + blockSize + ")."); + + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + + " (=" + bytesPerChecksum + ") must divide block size (=" + + blockSize + ")."); } this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); } /** Construct a new output stream for creating a file. */ - protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, - EnumSet<CreateFlag> flag, Progressable progress, - DataChecksum checksum, String[] favoredNodes, boolean createStreamer) - throws IOException { + protected DFSOutputStream(DFSClient dfsClient, String src, + HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress, + DataChecksum checksum, String[] favoredNodes, boolean createStreamer) { this(dfsClient, src, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); - computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); + computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), + bytesPerChecksum); if (createStreamer) { streamer = new DataStreamer(stat, null, dfsClient, src, progress, @@ -227,11 +226,10 @@ public class DFSOutputStream extends FSOutputSummer static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, - short replication, long blockSize, Progressable progress, int buffersize, + short replication, long blockSize, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException { - TraceScope scope = - dfsClient.newPathTraceScope("newStreamForCreate", src); - try { + try (TraceScope ignored = + dfsClient.newPathTraceScope("newStreamForCreate", src)) { HdfsFileStatus stat = null; // Retry the create if we get a RetryStartFileException up to a maximum @@ -242,7 +240,7 @@ public class DFSOutputStream extends FSOutputSummer shouldRetry = false; try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, - new EnumSetWritable<CreateFlag>(flag), createParent, replication, + new EnumSetWritable<>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS); break; } catch (RemoteException re) { @@ -283,8 +281,6 @@ public class DFSOutputStream extends FSOutputSummer } out.start(); return out; - } finally { - scope.close(); } } @@ -304,17 +300,17 @@ public class DFSOutputStream extends FSOutputSummer // The last partial block of the file has to be filled. if (!toNewBlock && lastBlock != null) { // indicate that we are appending to an existing block - streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum, - cachingStrategy, byteArrayManager); + streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, + checksum, cachingStrategy, byteArrayManager); getStreamer().setBytesCurBlock(lastBlock.getBlockSize()); adjustPacketChunkSize(stat); getStreamer().setPipelineInConstruction(lastBlock); } else { computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); - streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null, - dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, - favoredNodes); + streamer = new DataStreamer(stat, + lastBlock != null ? lastBlock.getBlock() : null, dfsClient, src, + progress, checksum, cachingStrategy, byteArrayManager, favoredNodes); } } @@ -355,21 +351,19 @@ public class DFSOutputStream extends FSOutputSummer } static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, - EnumSet<CreateFlag> flags, int bufferSize, Progressable progress, - LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, - String[] favoredNodes) throws IOException { - TraceScope scope = - dfsClient.newPathTraceScope("newStreamForAppend", src); + EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, + HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) + throws IOException { if(stat.getErasureCodingPolicy() != null) { - throw new IOException("Not support appending to a striping layout file yet."); + throw new IOException( + "Not support appending to a striping layout file yet."); } - try { + try (TraceScope ignored = + dfsClient.newPathTraceScope("newStreamForAppend", src)) { final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, progress, lastBlock, stat, checksum, favoredNodes); out.start(); return out; - } finally { - scope.close(); } } @@ -486,36 +480,28 @@ public class DFSOutputStream extends FSOutputSummer * of the DNs but not necessarily in the DN's OS buffers. * * It is a synchronous operation. When it returns, - * it guarantees that flushed data become visible to new readers. - * It is not guaranteed that data has been flushed to - * persistent store on the datanode. + * it guarantees that flushed data become visible to new readers. + * It is not guaranteed that data has been flushed to + * persistent store on the datanode. * Block allocations are persisted on namenode. */ @Override public void hflush() throws IOException { - TraceScope scope = - dfsClient.newPathTraceScope("hflush", src); - try { + try (TraceScope ignored = dfsClient.newPathTraceScope("hflush", src)) { flushOrSync(false, EnumSet.noneOf(SyncFlag.class)); - } finally { - scope.close(); } } @Override public void hsync() throws IOException { - TraceScope scope = - dfsClient.newPathTraceScope("hsync", src); - try { + try (TraceScope ignored = dfsClient.newPathTraceScope("hsync", src)) { flushOrSync(true, EnumSet.noneOf(SyncFlag.class)); - } finally { - scope.close(); } } /** - * The expected semantics is all data have flushed out to all replicas - * and all replicas have done posix fsync equivalent - ie the OS has + * The expected semantics is all data have flushed out to all replicas + * and all replicas have done posix fsync equivalent - ie the OS has * flushed it to the disk device (but the disk may have it in its cache). * * Note that only the current block is flushed to the disk device. @@ -527,12 +513,8 @@ public class DFSOutputStream extends FSOutputSummer * whether or not to update the block length in NameNode. */ public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { - TraceScope scope = - dfsClient.newPathTraceScope("hsync", src); - try { + try (TraceScope ignored = dfsClient.newPathTraceScope("hsync", src)) { flushOrSync(true, syncFlags); - } finally { - scope.close(); } } @@ -637,13 +619,14 @@ public class DFSOutputStream extends FSOutputSummer dfsClient.namenode.fsync(src, fileId, dfsClient.clientName, lastBlockLength); } catch (IOException ioe) { - DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe); - // If we got an error here, it might be because some other thread called - // close before our hflush completed. In that case, we should throw an - // exception that the stream is closed. + DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, + ioe); + // If we got an error here, it might be because some other thread + // called close before our hflush completed. In that case, we should + // throw an exception that the stream is closed. checkClosed(); - // If we aren't closed but failed to sync, we should expose that to the - // caller. + // If we aren't closed but failed to sync, we should expose that to + // the caller. throw ioe; } } @@ -654,9 +637,9 @@ public class DFSOutputStream extends FSOutputSummer } } } catch (InterruptedIOException interrupt) { - // This kind of error doesn't mean that the stream itself is broken - just the - // flushing thread got interrupted. So, we shouldn't close down the writer, - // but instead just propagate the error + // This kind of error doesn't mean that the stream itself is broken - just + // the flushing thread got interrupted. So, we shouldn't close down the + // writer, but instead just propagate the error throw interrupt; } catch (IOException e) { DFSClient.LOG.warn("Error while syncing", e); @@ -698,8 +681,8 @@ public class DFSOutputStream extends FSOutputSummer } /** - * Waits till all existing data is flushed and confirmations - * received from datanodes. + * Waits till all existing data is flushed and confirmations + * received from datanodes. */ protected void flushInternal() throws IOException { long toWaitFor; @@ -722,7 +705,7 @@ public class DFSOutputStream extends FSOutputSummer } /** - * Aborts this output stream and releases any system + * Aborts this output stream and releases any system * resources associated with this stream. */ synchronized void abort() throws IOException { @@ -730,7 +713,7 @@ public class DFSOutputStream extends FSOutputSummer return; } getStreamer().getLastException().set(new IOException("Lease timeout of " - + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired.")); + + (dfsClient.getConf().getHdfsTimeout() / 1000) + " seconds expired.")); closeThreads(true); dfsClient.endFileLease(fileId); } @@ -760,17 +743,14 @@ public class DFSOutputStream extends FSOutputSummer } /** - * Closes this output stream and releases any system + * Closes this output stream and releases any system * resources associated with this stream. */ @Override public synchronized void close() throws IOException { - TraceScope scope = - dfsClient.newPathTraceScope("DFSOutputStream#close", src); - try { + try (TraceScope ignored = + dfsClient.newPathTraceScope("DFSOutputStream#close", src)) { closeImpl(); - } finally { - scope.close(); } } @@ -795,20 +775,18 @@ public class DFSOutputStream extends FSOutputSummer // get last block before destroying the streamer ExtendedBlock lastBlock = getStreamer().getBlock(); closeThreads(false); - TraceScope scope = dfsClient.getTracer().newScope("completeFile"); - try { + try (TraceScope ignored = + dfsClient.getTracer().newScope("completeFile")) { completeFile(lastBlock); - } finally { - scope.close(); } dfsClient.endFileLease(fileId); - } catch (ClosedChannelException e) { + } catch (ClosedChannelException ignored) { } finally { setClosed(); } } - // should be called holding (this) lock since setTestFilename() may + // should be called holding (this) lock since setTestFilename() may // be called during unit tests protected void completeFile(ExtendedBlock last) throws IOException { long localstart = Time.monotonicNow(); @@ -824,12 +802,11 @@ public class DFSOutputStream extends FSOutputSummer if (!dfsClient.clientRunning || (hdfsTimeout > 0 && localstart + hdfsTimeout < Time.monotonicNow())) { - String msg = "Unable to close file because dfsclient " + - " was unable to contact the HDFS servers." + - " clientRunning " + dfsClient.clientRunning + - " hdfsTimeout " + hdfsTimeout; - DFSClient.LOG.info(msg); - throw new IOException(msg); + String msg = "Unable to close file because dfsclient " + + " was unable to contact the HDFS servers. clientRunning " + + dfsClient.clientRunning + " hdfsTimeout " + hdfsTimeout; + DFSClient.LOG.info(msg); + throw new IOException(msg); } try { if (retries == 0) { @@ -922,9 +899,9 @@ public class DFSOutputStream extends FSOutputSummer return getClass().getSimpleName() + ":" + streamer; } - static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient, - String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes) - throws IOException { + static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, + DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId, + String[] favoredNodes) throws IOException { final DfsClientConf conf = dfsClient.getConf(); int retries = conf.getNumBlockWriteLocateFollowingRetry(); long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); @@ -943,7 +920,8 @@ public class DFSOutputStream extends FSOutputSummer if (ue != e) { throw ue; // no need to retry these exceptions } - if (NotReplicatedYetException.class.getName().equals(e.getClassName())) { + if (NotReplicatedYetException.class.getName() + .equals(e.getClassName())) { if (retries == 0) { throw e; } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7136e8c5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java index 191691b..7fe1278 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java @@ -153,7 +153,6 @@ public class DFSPacket { /** * Write the full packet, including the header, to the given output stream. * - * @param stm * @throws IOException */ public synchronized void writeTo(DataOutputStream stm) throws IOException { @@ -187,15 +186,18 @@ public class DFSPacket { // corrupt the data for testing. if (DFSClientFaultInjector.get().corruptPacket()) { - buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= + 0xff; } // Write the now contiguous full packet to the output stream. - stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen); + stm.write(buf, headerStart, + header.getSerializedSize() + checksumLen + dataLen); // undo corruption. if (DFSClientFaultInjector.get().uncorruptPacket()) { - buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= + 0xff; } } @@ -207,8 +209,6 @@ public class DFSPacket { /** * Release the buffer in this packet to ByteArrayManager. - * - * @param bam */ synchronized void releaseBuffer(ByteArrayManager bam) { bam.release(buf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7136e8c5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 69105a0..8265090 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -33,7 +33,6 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -586,7 +585,7 @@ public class DFSStripedInputStream extends DFSInputStream { abstract void prepareDecodeInputs(); /** prepare the parity chunk and block reader if necessary */ - abstract boolean prepareParityChunk(int index) throws IOException; + abstract boolean prepareParityChunk(int index); abstract void decode(); @@ -878,7 +877,7 @@ public class DFSStripedInputStream extends DFSInputStream { } @Override - boolean prepareParityChunk(int index) throws IOException { + boolean prepareParityChunk(int index) { Preconditions.checkState(index >= dataBlkNum && alignedStripe.chunks[index] == null); if (blockReaders[index] != null && blockReaders[index].shouldSkip) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7136e8c5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index bf4e10e..88e4b8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -892,11 +892,9 @@ public class DFSStripedOutputStream extends DFSOutputStream { } closeThreads(false); - TraceScope scope = dfsClient.getTracer().newScope("completeFile"); - try { + try (TraceScope ignored = + dfsClient.getTracer().newScope("completeFile")) { completeFile(currentBlockGroup); - } finally { - scope.close(); } dfsClient.endFileLease(fileId); } catch (ClosedChannelException ignored) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7136e8c5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index d1829d6..c14d5d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -304,8 +304,8 @@ public class DFSUtilClient { * @param keys Set of keys to look for in the order of preference * @return a map(nameserviceId to map(namenodeId to InetSocketAddress)) */ - static Map<String, Map<String, InetSocketAddress>> - getAddresses(Configuration conf, String defaultAddress, String... keys) { + static Map<String, Map<String, InetSocketAddress>> getAddresses( + Configuration conf, String defaultAddress, String... keys) { Collection<String> nameserviceIds = getNameServiceIds(conf); return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys); } @@ -318,8 +318,7 @@ public class DFSUtilClient { * * @return a map(nameserviceId to map(namenodeId to InetSocketAddress)) */ - static Map<String, Map<String, InetSocketAddress>> - getAddressesForNsIds( + static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds( Configuration conf, Collection<String> nsIds, String defaultAddress, String... keys) { // Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>] @@ -327,7 +326,7 @@ public class DFSUtilClient { Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap(); for (String nsId : emptyAsSingletonNull(nsIds)) { Map<String, InetSocketAddress> isas = - getAddressesForNameserviceId(conf, nsId, defaultAddress, keys); + getAddressesForNameserviceId(conf, nsId, defaultAddress, keys); if (!isas.isEmpty()) { ret.put(nsId, isas); } @@ -534,7 +533,7 @@ public class DFSUtilClient { public static Peer peerFromSocket(Socket socket) throws IOException { - Peer peer = null; + Peer peer; boolean success = false; try { // TCP_NODELAY is crucial here because of bad interactions between @@ -561,7 +560,7 @@ public class DFSUtilClient { return peer; } finally { if (!success) { - if (peer != null) peer.close(); + // peer is always null so no need to call peer.close(). socket.close(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7136e8c5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 683d98d..4260a9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -40,7 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; @@ -72,7 +71,6 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; -import org.apache.htrace.core.Sampler; import org.apache.htrace.core.Span; import org.apache.htrace.core.SpanId; import org.apache.htrace.core.TraceScope; @@ -87,6 +85,8 @@ import com.google.common.cache.RemovalNotification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + /********************************************************************* * * The DataStreamer class is responsible for sending data packets to the @@ -128,7 +128,8 @@ class DataStreamer extends Daemon { final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr); final Socket sock = client.socketFactory.createSocket(); final int timeout = client.getDatanodeReadTimeout(length); - NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout()); + NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), + conf.getSocketTimeout()); sock.setSoTimeout(timeout); sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); LOG.debug("Send buf size {}", sock.getSendBufferSize()); @@ -157,7 +158,7 @@ class DataStreamer extends Daemon { } packets.clear(); } - + class LastExceptionInStreamer { private IOException thrown; @@ -448,12 +449,11 @@ class DataStreamer extends Daemon { * Construct a data streamer for appending to the last partial block * @param lastBlock last block of the file to be appended * @param stat status of the file to be appended - * @throws IOException if error occurs */ DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, - ByteArrayManager byteArrayManage) throws IOException { + ByteArrayManager byteArrayManage) { this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, true, null); stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; @@ -541,7 +541,7 @@ class DataStreamer extends Daemon { // process datanode IO errors if any boolean doSleep = processDatanodeOrExternalError(); - final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2; + final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2; synchronized (dataQueue) { // wait for a packet to be sent. long now = Time.monotonicNow(); @@ -644,9 +644,8 @@ class DataStreamer extends Daemon { LOG.debug(this + " sending " + one); // write out data to remote datanode - TraceScope writeScope = dfsClient.getTracer(). - newScope("DataStreamer#writeTo", spanId); - try { + try (TraceScope ignored = dfsClient.getTracer(). + newScope("DataStreamer#writeTo", spanId)) { one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { @@ -658,8 +657,6 @@ class DataStreamer extends Daemon { // will be taken out then. errorState.markFirstNodeIfNotMarked(); throw e; - } finally { - writeScope.close(); } lastPacket = Time.monotonicNow(); @@ -749,9 +746,8 @@ class DataStreamer extends Daemon { * @throws IOException */ void waitForAckedSeqno(long seqno) throws IOException { - TraceScope scope = dfsClient.getTracer(). - newScope("waitForAckedSeqno"); - try { + try (TraceScope ignored = dfsClient.getTracer(). + newScope("waitForAckedSeqno")) { LOG.debug("Waiting for ack for: {}", seqno); long begin = Time.monotonicNow(); try { @@ -771,15 +767,13 @@ class DataStreamer extends Daemon { } } checkClosed(); - } catch (ClosedChannelException e) { + } catch (ClosedChannelException cce) { } long duration = Time.monotonicNow() - begin; if (duration > dfsclientSlowLogThresholdMs) { LOG.warn("Slow waitForAckedSeqno took " + duration + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)"); } - } finally { - scope.close(); } } @@ -826,7 +820,7 @@ class DataStreamer extends Daemon { } checkClosed(); queuePacket(packet); - } catch (ClosedChannelException e) { + } catch (ClosedChannelException ignored) { } } } @@ -928,10 +922,7 @@ class DataStreamer extends Daemon { assert false; } - if (addr != null && NetUtils.isLocalAddress(addr)) { - return true; - } - return false; + return addr != null && NetUtils.isLocalAddress(addr); } // @@ -1169,17 +1160,15 @@ class DataStreamer extends Daemon { ) throws IOException { if (nodes.length != original.length + 1) { throw new IOException( - new StringBuilder() - .append("Failed to replace a bad datanode on the existing pipeline ") - .append("due to no more good datanodes being available to try. ") - .append("(Nodes: current=").append(Arrays.asList(nodes)) - .append(", original=").append(Arrays.asList(original)).append("). ") - .append("The current failed datanode replacement policy is ") - .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ") - .append("a client may configure this via '") - .append(BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY) - .append("' in its configuration.") - .toString()); + "Failed to replace a bad datanode on the existing pipeline " + + "due to no more good datanodes being available to try. " + + "(Nodes: current=" + Arrays.asList(nodes) + + ", original=" + Arrays.asList(original) + "). " + + "The current failed datanode replacement policy is " + + dfsClient.dtpReplaceDatanodeOnFailure + + ", and a client may configure this via '" + + BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY + + "' in its configuration."); } for(int i = 0; i < nodes.length; i++) { int j = 0; @@ -1228,7 +1217,7 @@ class DataStreamer extends Daemon { final StorageType[] originalTypes = storageTypes; final String[] originalIDs = storageIDs; IOException caughtException = null; - ArrayList<DatanodeInfo> exclude = new ArrayList<DatanodeInfo>(failed); + ArrayList<DatanodeInfo> exclude = new ArrayList<>(failed); while (tried < 3) { LocatedBlock lb; //get a new datanode @@ -1267,7 +1256,8 @@ class DataStreamer extends Daemon { private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, - final Token<BlockTokenIdentifier> blockToken) throws IOException { + final Token<BlockTokenIdentifier> blockToken) + throws IOException { //transfer replica to the new datanode Socket sock = null; DataOutputStream out = null; @@ -1278,7 +1268,8 @@ class DataStreamer extends Daemon { // transfer timeout multiplier based on the transfer size // One per 200 packets = 12.8MB. Minimum is 2. - int multi = 2 + (int)(bytesSent/dfsClient.getConf().getWritePacketSize())/200; + int multi = 2 + (int)(bytesSent /dfsClient.getConf().getWritePacketSize()) + / 200; final long readTimeout = dfsClient.getDatanodeReadTimeout(multi); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); @@ -1448,12 +1439,14 @@ class DataStreamer extends Daemon { // good reports should follow bad ones, if client committed // with those nodes. Thread.sleep(2000); - } catch (InterruptedException ie) {} + } catch (InterruptedException ignored) { + } } } private LocatedBlock updateBlockForPipeline() throws IOException { - return dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName); + return dfsClient.namenode.updateBlockForPipeline(block, + dfsClient.clientName); } static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) { @@ -1480,11 +1473,11 @@ class DataStreamer extends Daemon { * Returns the list of target datanodes. */ protected LocatedBlock nextBlockOutputStream() throws IOException { - LocatedBlock lb = null; - DatanodeInfo[] nodes = null; - StorageType[] storageTypes = null; - int count = getNumBlockWriteRetry(); - boolean success = false; + LocatedBlock lb; + DatanodeInfo[] nodes; + StorageType[] storageTypes; + int count = dfsClient.getConf().getNumBlockWriteRetry(); + boolean success; ExtendedBlock oldBlock = block; do { errorState.resetInternalError(); @@ -1534,7 +1527,6 @@ class DataStreamer extends Daemon { LOG.info("nodes are empty for write pipeline of " + block); return false; } - Status pipelineStatus = SUCCESS; String firstBadLink = ""; boolean checkRestart = false; if (LOG.isDebugEnabled()) { @@ -1569,25 +1561,26 @@ class DataStreamer extends Daemon { // Xmit header info to datanode // - BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage; + BlockConstructionStage bcs = recoveryFlag ? + stage.getRecoveryStage() : stage; // We cannot change the block length in 'block' as it counts the number // of bytes ack'ed. ExtendedBlock blockCopy = new ExtendedBlock(block); blockCopy.setNumBytes(stat.getBlockSize()); - boolean[] targetPinnings = getPinnings(nodes, true); + boolean[] targetPinnings = getPinnings(nodes); // send the request new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, - (targetPinnings == null ? false : targetPinnings[0]), targetPinnings); + (targetPinnings != null && targetPinnings[0]), targetPinnings); // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(blockReplyStream)); - pipelineStatus = resp.getStatus(); + Status pipelineStatus = resp.getStatus(); firstBadLink = resp.getFirstBadLink(); // Got an restart OOB ack. @@ -1600,7 +1593,7 @@ class DataStreamer extends Daemon { checkRestart = true; throw new IOException("A datanode is restarting."); } - + String logInfo = "ack with firstBadLink as " + firstBadLink; DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo); @@ -1612,7 +1605,8 @@ class DataStreamer extends Daemon { if (!errorState.isRestartingNode()) { LOG.info("Exception in createBlockOutputStream " + this, ie); } - if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + if (ie instanceof InvalidEncryptionKeyException && + refetchEncryptionKey > 0) { LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + nodes[0] + " : " + ie); @@ -1634,14 +1628,15 @@ class DataStreamer extends Daemon { } } } else { - assert checkRestart == false; + assert !checkRestart; errorState.setBadNodeIndex(0); } final int i = errorState.getBadNodeIndex(); // Check whether there is a restart worth waiting for. if (checkRestart && shouldWaitForRestart(i)) { - errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]); + errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + + nodes[i]); } errorState.setInternalError(); lastException.set(ie); @@ -1651,7 +1646,6 @@ class DataStreamer extends Daemon { IOUtils.closeSocket(s); s = null; IOUtils.closeStream(out); - out = null; IOUtils.closeStream(blockReplyStream); blockReplyStream = null; } @@ -1660,19 +1654,18 @@ class DataStreamer extends Daemon { } } - private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) { + private boolean[] getPinnings(DatanodeInfo[] nodes) { if (favoredNodes == null) { return null; } else { boolean[] pinnings = new boolean[nodes.length]; - HashSet<String> favoredSet = - new HashSet<String>(Arrays.asList(favoredNodes)); + HashSet<String> favoredSet = new HashSet<>(Arrays.asList(favoredNodes)); for (int i = 0; i < nodes.length; i++) { pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname()); LOG.debug("{} was chosen by name node (favored={}).", nodes[i].getXferAddrWithHostname(), pinnings[i]); } - if (shouldLog && !favoredSet.isEmpty()) { + if (!favoredSet.isEmpty()) { // There is one or more favored nodes that were not allocated. LOG.warn("These favored nodes were specified but not chosen: " + favoredSet + " Specified favored nodes: " @@ -1782,7 +1775,7 @@ class DataStreamer extends Daemon { * For heartbeat packets, create buffer directly by new byte[] * since heartbeats should not be blocked. */ - private DFSPacket createHeartbeatPacket() throws InterruptedIOException { + private DFSPacket createHeartbeatPacket() { final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN]; return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 0, false); } @@ -1794,7 +1787,8 @@ class DataStreamer extends Daemon { .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() { @Override public void onRemoval( - RemovalNotification<DatanodeInfo, DatanodeInfo> notification) { + @Nonnull RemovalNotification<DatanodeInfo, DatanodeInfo> + notification) { LOG.info("Removing node " + notification.getKey() + " from the excluded nodes list"); }
