Repository: hadoop Updated Branches: refs/heads/HDFS-7285 f3d0e5588 -> fc4fd38af
HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream (stateful read). Contributed by Jing Zhao Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fc4fd38a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fc4fd38a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fc4fd38a Branch: refs/heads/HDFS-7285 Commit: fc4fd38af0ab566bbfe9f9523785ac553bc6610d Parents: f3d0e55 Author: Zhe Zhang <z...@apache.org> Authored: Wed Apr 29 15:53:31 2015 -0700 Committer: Zhe Zhang <z...@apache.org> Committed: Wed Apr 29 15:53:31 2015 -0700 ---------------------------------------------------------------------- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../hadoop/hdfs/DFSStripedInputStream.java | 336 ++++++++----------- 2 files changed, 150 insertions(+), 189 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc4fd38a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 9b4bf24..6a9bdee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -143,3 +143,6 @@ HDFS-8235. Erasure Coding: Create DFSStripedInputStream in DFSClient#open. (Kai Sasaki via jing9) + + HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream + (stateful read). (Jing Zhao via Zhe Zhang) http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc4fd38a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index f6f7ed2..3da7306 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -22,11 +22,8 @@ import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -126,23 +123,42 @@ public class DFSStripedInputStream extends DFSInputStream { return results; } + private static class ReaderRetryPolicy { + private int fetchEncryptionKeyTimes = 1; + private int fetchTokenTimes = 1; + + void refetchEncryptionKey() { + fetchEncryptionKeyTimes--; + } + + void refetchToken() { + fetchTokenTimes--; + } + + boolean shouldRefetchEncryptionKey() { + return fetchEncryptionKeyTimes > 0; + } + + boolean shouldRefetchToken() { + return fetchTokenTimes > 0; + } + } + private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS; - private BlockReader[] blockReaders = null; - private DatanodeInfo[] currentNodes = null; + private final BlockReader[] blockReaders = new BlockReader[groupSize]; + private final DatanodeInfo[] currentNodes = new DatanodeInfo[groupSize]; private final int cellSize; private final short dataBlkNum; private final short parityBlkNum; - private final ECInfo ecInfo; - DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ECInfo info) - throws IOException { + DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, + ECInfo ecInfo) throws IOException { super(dfsClient, src, verifyChecksum); // ECInfo is restored from NN just before reading striped file. - assert info != null; - ecInfo = info; + assert ecInfo != null; cellSize = ecInfo.getSchema().getChunkSize(); - dataBlkNum = (short)ecInfo.getSchema().getNumDataUnits(); - parityBlkNum = (short)ecInfo.getSchema().getNumParityUnits(); + dataBlkNum = (short) ecInfo.getSchema().getNumDataUnits(); + parityBlkNum = (short) ecInfo.getSchema().getNumParityUnits(); DFSClient.LOG.debug("Creating an striped input stream for file " + src); } @@ -162,9 +178,7 @@ public class DFSStripedInputStream extends DFSInputStream { * When seeking into a new block group, create blockReader for each internal * block in the group. */ - @VisibleForTesting - private synchronized DatanodeInfo[] blockSeekTo(long target) - throws IOException { + private synchronized void blockSeekTo(long target) throws IOException { if (target >= getFileLength()) { throw new IOException("Attempted to read past end of file"); } @@ -172,18 +186,13 @@ public class DFSStripedInputStream extends DFSInputStream { // Will be getting a new BlockReader. closeCurrentBlockReaders(); - // Connect to best DataNode for desired Block, with potential offset - DatanodeInfo[] chosenNodes = new DatanodeInfo[groupSize]; - int refetchToken = 1; // only need to get a new access token once - int refetchEncryptionKey = 1; // only need to get a new encryption key once - // Compute desired striped block group LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target); - // Update current position this.pos = target; this.blockEnd = targetBlockGroup.getStartOffset() + targetBlockGroup.getBlockSize() - 1; + currentLocatedBlock = targetBlockGroup; long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset(); LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup( @@ -191,71 +200,50 @@ public class DFSStripedInputStream extends DFSInputStream { // The purpose is to get start offset into each block ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, offsetIntoBlockGroup, 0, 0); + + final ReaderRetryPolicy retry = new ReaderRetryPolicy(); + for (int i = 0; i < groupSize; i++) { + LocatedBlock targetBlock = targetBlocks[i]; + if (targetBlock != null) { + DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null); + if (retval != null) { + currentNodes[i] = retval.info; + blockReaders[i] = getBlockReaderWithRetry(targetBlock, + readPortions[i].startOffsetInBlock, + targetBlock.getBlockSize() - readPortions[i].startOffsetInBlock, + retval.addr, retval.storageType, retval.info, target, retry); + } + } + } + } + + private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock, + long offsetInBlock, long length, InetSocketAddress targetAddr, + StorageType storageType, DatanodeInfo datanode, long offsetInFile, + ReaderRetryPolicy retry) throws IOException { + // only need to get a new access token or a new encryption key once while (true) { - int i = 0; - InetSocketAddress targetAddr = null; try { - blockReaders = new BlockReader[groupSize]; - for (i = 0; i < groupSize; i++) { - LocatedBlock targetBlock = targetBlocks[i]; - if (targetBlock == null) { - continue; - } - long offsetIntoBlock = readPortions[i].startOffsetInBlock; - DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null); - chosenNodes[i] = retval.info; - targetAddr = retval.addr; - StorageType storageType = retval.storageType; - - ExtendedBlock blk = targetBlock.getBlock(); - Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); - CachingStrategy curCachingStrategy; - boolean shortCircuitForbidden; - synchronized(infoLock) { - curCachingStrategy = cachingStrategy; - shortCircuitForbidden = shortCircuitForbidden(); - } - blockReaders[i] = new BlockReaderFactory(dfsClient.getConf()). - setInetSocketAddress(targetAddr). - setRemotePeerFactory(dfsClient). - setDatanodeInfo(chosenNodes[i]). - setStorageType(storageType). - setFileName(src). - setBlock(blk). - setBlockToken(accessToken). - setStartOffset(offsetIntoBlock). - setVerifyChecksum(verifyChecksum). - setClientName(dfsClient.clientName). - setLength(blk.getNumBytes() - offsetIntoBlock). - setCachingStrategy(curCachingStrategy). - setAllowShortCircuitLocalReads(!shortCircuitForbidden). - setClientCacheContext(dfsClient.getClientContext()). - setUserGroupInformation(dfsClient.ugi). - setConfiguration(dfsClient.getConfiguration()). - build(); - } - currentLocatedBlock = targetBlockGroup; - return chosenNodes; - } catch (IOException ex) { - // Retry in case of encryption key or token exceptions. Otherwise throw - // IOException: since each internal block is singly replicated, it's - // not meaningful trying to locate another replica. - if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + return getBlockReader(targetBlock, offsetInBlock, length, targetAddr, + storageType, datanode); + } catch (IOException e) { + if (e instanceof InvalidEncryptionKeyException && + retry.shouldRefetchEncryptionKey()) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr - + " : " + ex); - // The encryption key used is invalid. - refetchEncryptionKey--; + + " : " + e); dfsClient.clearDataEncryptionKey(); - } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { - refetchToken--; - fetchBlockAt(target); + retry.refetchEncryptionKey(); + } else if (retry.shouldRefetchToken() && + tokenRefetchNeeded(e, targetAddr)) { + fetchBlockAt(offsetInFile); + retry.refetchToken(); } else { DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" - + ", add to deadNodes and continue. " + ex, ex); - // Put chosen node into dead list and throw exception - addToDeadNodes(chosenNodes[i]); - throw ex; + + ", add to deadNodes and continue.", e); + // Put chosen node into dead list, continue + addToDeadNodes(datanode); + return null; } } } @@ -272,15 +260,15 @@ public class DFSStripedInputStream extends DFSInputStream { return; } for (int i = 0; i < groupSize; i++) { - if (blockReaders[i] == null) { - continue; - } - try { - blockReaders[i].close(); - } catch (IOException e) { - DFSClient.LOG.error("error closing blockReader", e); + if (blockReaders[i] != null) { + try { + blockReaders[i].close(); + } catch (IOException e) { + DFSClient.LOG.error("error closing blockReader", e); + } + blockReaders[i] = null; } - blockReaders[i] = null; + currentNodes[i] = null; } blockEnd = -1; } @@ -292,123 +280,93 @@ public class DFSStripedInputStream extends DFSInputStream { if (closed.get()) { throw new IOException("Stream closed"); } - Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap - = new HashMap<>(); + Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>(); failures = 0; if (pos < getFileLength()) { - int retries = 2; /** Index of the target block in a stripe to read from */ int idxInGroup = (int) ((pos / cellSize) % dataBlkNum); - while (retries > 0) { - try { - // currentNode can be left as null if previous read had a checksum - // error on the same block. See HDFS-3067 - if (pos > blockEnd || currentNodes == null) { - currentNodes = blockSeekTo(pos); - } - int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); - synchronized(infoLock) { - if (locatedBlocks.isLastBlockComplete()) { - realLen = (int) Math.min(realLen, - locatedBlocks.getFileLength() - pos); - } + try { + if (pos > blockEnd) { + blockSeekTo(pos); + } + int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); + synchronized (infoLock) { + if (locatedBlocks.isLastBlockComplete()) { + realLen = (int) Math.min(realLen, + locatedBlocks.getFileLength() - pos); } + } - /** Number of bytes already read into buffer */ - int result = 0; - while (result < realLen) { - /** - * Temporary position into the file; {@link pos} might not proceed - * to this temporary position in case of exceptions. - */ - long tmpPos = pos + result; - /** Start and end offsets of a cell in the file */ - long cellStart = (tmpPos / cellSize) * cellSize; - long cellEnd = cellStart + cellSize - 1; - - /** Number of bytes to read from the current cell */ - int realLenInCell = (int) Math.min(realLen - result, - cellEnd - tmpPos + 1L); - assert realLenInCell > 0 : "Temporary position shouldn't be " + - "after cellEnd"; - // Read from one blockReader up to cell boundary - int cellRet = readBuffer(blockReaders[idxInGroup], - currentNodes[idxInGroup], strategy, off + result, - realLenInCell); - if (cellRet >= 0) { - result += cellRet; - if (cellRet < realLenInCell) { - // A short read indicates the current blockReader buffer is - // already drained. Should return the read call. Otherwise - // should proceed to the next cell. - break; - } - } else { - // got a EOS from reader though we expect more data on it. - throw new IOException("Unexpected EOS from the reader"); + /** Number of bytes already read into buffer */ + int result = 0; + while (result < realLen) { + /** + * Temporary position into the file; {@link pos} might not proceed + * to this temporary position in case of exceptions. + */ + long tmpPos = pos + result; + /** Start and end offsets of a cell in the file */ + long cellStart = (tmpPos / cellSize) * cellSize; + long cellEnd = cellStart + cellSize - 1; + + /** Number of bytes to read from the current cell */ + int realLenInCell = (int) Math.min(realLen - result, + cellEnd - tmpPos + 1L); + assert realLenInCell > 0 : "Temporary position shouldn't be " + + "after cellEnd"; + + // Read from one blockReader up to cell boundary + int cellRet = readBuffer(blockReaders[idxInGroup], + currentNodes[idxInGroup], strategy, off + result, realLenInCell, + corruptedBlockMap); + if (cellRet >= 0) { + result += cellRet; + if (cellRet < realLenInCell) { + // A short read indicates the current blockReader buffer is + // already drained. Should return the read call. Otherwise + // should proceed to the next cell. + break; } - idxInGroup = (idxInGroup + 1) % dataBlkNum; - } - - pos += result; - - if (dfsClient.stats != null) { - dfsClient.stats.incrementBytesRead(result); - } - return result; - } catch (ChecksumException ce) { - throw ce; - } catch (IOException e) { - if (retries == 1) { - DFSClient.LOG.warn("DFS Read", e); - } - blockEnd = -1; - if (currentNodes[idxInGroup] != null) { - addToDeadNodes(currentNodes[idxInGroup]); + } else { + // got a EOS from reader though we expect more data on it. + throw new IOException("Unexpected EOS from the reader"); } - if (--retries == 0) { - throw e; - } - } finally { - // Check if need to report block replicas corruption either read - // was successful or ChecksumException occured. - reportCheckSumFailure(corruptedBlockMap, - currentLocatedBlock.getLocations().length); + idxInGroup = (idxInGroup + 1) % dataBlkNum; } + pos += result; + if (dfsClient.stats != null) { + dfsClient.stats.incrementBytesRead(result); + } + return result; + } finally { + // Check if need to report block replicas corruption either read + // was successful or ChecksumException occured. + reportCheckSumFailure(corruptedBlockMap, + currentLocatedBlock.getLocations().length); } } return -1; } private synchronized int readBuffer(BlockReader blockReader, - DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len) - throws IOException { - IOException ioe; - while (true) { - try { - return readerStrategy.doRead(blockReader, off, len); - } catch ( ChecksumException ce ) { - DFSClient.LOG.warn("Found Checksum error for " - + getCurrentBlock() + " from " + currentNode - + " at " + ce.getPos()); - // If current block group is corrupt, it's meaningless to retry. - // TODO: this should trigger decoding logic (HDFS-7678) - throw ce; - } catch ( IOException e ) { - ioe = e; - } - - boolean sourceFound = seekToBlockSource(pos); - if (!sourceFound) { - throw ioe; - } + DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len, + Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { + try { + return readerStrategy.doRead(blockReader, off, len); + } catch ( ChecksumException ce ) { + DFSClient.LOG.warn("Found Checksum error for " + + getCurrentBlock() + " from " + currentNode + + " at " + ce.getPos()); + // we want to remember which block replicas we have tried + addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, + corruptedBlockMap); + } catch (IOException e) { + DFSClient.LOG.warn("Exception while reading from " + + getCurrentBlock() + " of " + src + " from " + + currentNode, e); } - } - - private boolean seekToBlockSource(long targetPos) - throws IOException { - currentNodes = blockSeekTo(targetPos); - return true; + // TODO: this should trigger decoding logic (HDFS-7678) + return -1; } protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy { @@ -418,7 +376,7 @@ public class DFSStripedInputStream extends DFSInputStream { @Override public int doRead(BlockReader blockReader, int off, int len) - throws ChecksumException, IOException { + throws IOException { int oldlimit = buf.limit(); if (buf.remaining() > len) { buf.limit(buf.position() + len);