HDFS-10861. Refactor StripeReaders and use ECChunk version decode API. Contributed by Sammi Chen
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/734d54c1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/734d54c1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/734d54c1 Branch: refs/heads/HADOOP-12756 Commit: 734d54c1a8950446e68098f62d8964e02ecc2890 Parents: 2b66d9e Author: Kai Zheng <kai.zh...@intel.com> Authored: Wed Sep 21 21:34:48 2016 +0800 Committer: Kai Zheng <kai.zh...@intel.com> Committed: Wed Sep 21 21:34:48 2016 +0800 ---------------------------------------------------------------------- .../apache/hadoop/io/ElasticByteBufferPool.java | 2 +- .../apache/hadoop/io/erasurecode/ECChunk.java | 22 + .../io/erasurecode/rawcoder/CoderUtil.java | 3 + .../org/apache/hadoop/hdfs/DFSInputStream.java | 20 +- .../hadoop/hdfs/DFSStripedInputStream.java | 654 +++---------------- .../hadoop/hdfs/PositionStripeReader.java | 104 +++ .../hadoop/hdfs/StatefulStripeReader.java | 95 +++ .../org/apache/hadoop/hdfs/StripeReader.java | 463 +++++++++++++ .../hadoop/hdfs/util/StripedBlockUtil.java | 158 ++--- .../hadoop/hdfs/util/TestStripedBlockUtil.java | 1 - 10 files changed, 844 insertions(+), 678 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java index c35d608..023f37f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java @@ -85,7 +85,7 @@ public final class ElasticByteBufferPool implements ByteBufferPool { private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) { return direct ? directBuffers : buffers; } - + @Override public synchronized ByteBuffer getBuffer(boolean direct, int length) { TreeMap<Key, ByteBuffer> tree = getBufferTree(direct); http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java index cd7c6be..536715b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java @@ -29,6 +29,9 @@ public class ECChunk { private ByteBuffer chunkBuffer; + // TODO: should be in a more general flags + private boolean allZero = false; + /** * Wrapping a ByteBuffer * @param buffer buffer to be wrapped by the chunk @@ -37,6 +40,13 @@ public class ECChunk { this.chunkBuffer = buffer; } + public ECChunk(ByteBuffer buffer, int offset, int len) { + ByteBuffer tmp = buffer.duplicate(); + tmp.position(offset); + tmp.limit(offset + len); + this.chunkBuffer = tmp.slice(); + } + /** * Wrapping a bytes array * @param buffer buffer to be wrapped by the chunk @@ -45,6 +55,18 @@ public class ECChunk { this.chunkBuffer = ByteBuffer.wrap(buffer); } + public ECChunk(byte[] buffer, int offset, int len) { + this.chunkBuffer = ByteBuffer.wrap(buffer, offset, len); + } + + public boolean isAllZero() { + return allZero; + } + + public void setAllZero(boolean allZero) { + this.allZero = allZero; + } + /** * Convert to ByteBuffer * @return ByteBuffer http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java index b22d44f..ef34639 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java @@ -115,6 +115,9 @@ final class CoderUtil { buffers[i] = null; } else { buffers[i] = chunk.getBuffer(); + if (chunk.isAllZero()) { + CoderUtil.resetBuffer(buffers[i], buffers[i].remaining()); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 31fa897..dbffc64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -240,7 +240,7 @@ public class DFSInputStream extends FSInputStream Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); while (oldIter.hasNext() && newIter.hasNext()) { - if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) { + if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) { throw new IOException("Blocklist for " + src + " has changed!"); } } @@ -677,8 +677,8 @@ public class DFSInputStream extends FSInputStream if (oneByteBuf == null) { oneByteBuf = new byte[1]; } - int ret = read( oneByteBuf, 0, 1 ); - return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff); + int ret = read(oneByteBuf, 0, 1); + return (ret <= 0) ? -1 : (oneByteBuf[0] & 0xff); } /* This is a used by regular read() and handles ChecksumExceptions. @@ -702,7 +702,7 @@ public class DFSInputStream extends FSInputStream // retry as many times as seekToNewSource allows. try { return reader.readFromBlock(blockReader, len); - } catch ( ChecksumException ce ) { + } catch (ChecksumException ce) { DFSClient.LOG.warn("Found Checksum error for " + getCurrentBlock() + " from " + currentNode + " at " + ce.getPos()); @@ -710,7 +710,7 @@ public class DFSInputStream extends FSInputStream retryCurrentNode = false; // we want to remember which block replicas we have tried corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode); - } catch ( IOException e ) { + } catch (IOException e) { if (!retryCurrentNode) { DFSClient.LOG.warn("Exception while reading from " + getCurrentBlock() + " of " + src + " from " @@ -779,7 +779,9 @@ public class DFSInputStream extends FSInputStream DFSClient.LOG.warn("DFS Read", e); } blockEnd = -1; - if (currentNode != null) { addToDeadNodes(currentNode); } + if (currentNode != null) { + addToDeadNodes(currentNode); + } if (--retries == 0) { throw e; } @@ -1397,10 +1399,10 @@ public class DFSInputStream extends FSInputStream @Override public long skip(long n) throws IOException { - if ( n > 0 ) { + if (n > 0) { long curPos = getPos(); long fileLen = getFileLength(); - if( n+curPos > fileLen ) { + if (n+curPos > fileLen) { n = fileLen - curPos; } seek(curPos+n); @@ -1550,7 +1552,7 @@ public class DFSInputStream extends FSInputStream * Get statistics about the reads which this DFSInputStream has done. */ public ReadStatistics getReadStatistics() { - return new ReadStatistics(readStatistics); + return readStatistics; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index ccaf6a7..922f74e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -17,24 +17,21 @@ */ package org.apache.hadoop.hdfs; -import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; +import org.apache.hadoop.hdfs.StripeReader.BlockReaderInfo; +import org.apache.hadoop.hdfs.StripeReader.ReaderRetryPolicy; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripeRange; import org.apache.hadoop.io.ByteBufferPool; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; - import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -44,7 +41,6 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import java.io.EOFException; import java.io.IOException; -import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -53,111 +49,32 @@ import java.util.EnumSet; import java.util.List; import java.util.Set; import java.util.Collection; -import java.util.Map; -import java.util.HashMap; -import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; /** - * DFSStripedInputStream reads from striped block groups + * DFSStripedInputStream reads from striped block groups. */ @InterfaceAudience.Private public class DFSStripedInputStream extends DFSInputStream { - private static class ReaderRetryPolicy { - private int fetchEncryptionKeyTimes = 1; - private int fetchTokenTimes = 1; - - void refetchEncryptionKey() { - fetchEncryptionKeyTimes--; - } - - void refetchToken() { - fetchTokenTimes--; - } - - boolean shouldRefetchEncryptionKey() { - return fetchEncryptionKeyTimes > 0; - } - - boolean shouldRefetchToken() { - return fetchTokenTimes > 0; - } - } - - /** Used to indicate the buffered data's range in the block group */ - private static class StripeRange { - /** start offset in the block group (inclusive) */ - final long offsetInBlock; - /** length of the stripe range */ - final long length; - - StripeRange(long offsetInBlock, long length) { - Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0); - this.offsetInBlock = offsetInBlock; - this.length = length; - } - - boolean include(long pos) { - return pos >= offsetInBlock && pos < offsetInBlock + length; - } - } - - private static class BlockReaderInfo { - final BlockReader reader; - final DatanodeInfo datanode; - /** - * when initializing block readers, their starting offsets are set to the same - * number: the smallest internal block offsets among all the readers. This is - * because it is possible that for some internal blocks we have to read - * "backwards" for decoding purpose. We thus use this offset array to track - * offsets for all the block readers so that we can skip data if necessary. - */ - long blockReaderOffset; - /** - * We use this field to indicate whether we should use this reader. In case - * we hit any issue with this reader, we set this field to true and avoid - * using it for the next stripe. - */ - boolean shouldSkip = false; - - BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) { - this.reader = reader; - this.datanode = dn; - this.blockReaderOffset = offset; - } - - void setOffset(long offset) { - this.blockReaderOffset = offset; - } - - void skip() { - this.shouldSkip = true; - } - } - private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); - private final BlockReaderInfo[] blockReaders; private final int cellSize; private final short dataBlkNum; private final short parityBlkNum; private final int groupSize; - /** the buffer for a complete stripe */ + /** the buffer for a complete stripe. */ private ByteBuffer curStripeBuf; private ByteBuffer parityBuf; private final ErasureCodingPolicy ecPolicy; private final RawErasureDecoder decoder; /** - * indicate the start/end offset of the current buffered stripe in the - * block group + * Indicate the start/end offset of the current buffered stripe in the + * block group. */ private StripeRange curStripeRange; - private final CompletionService<Void> readingService; /** * When warning the user of a lost block in striping mode, we remember the @@ -167,8 +84,8 @@ public class DFSStripedInputStream extends DFSInputStream { * * To minimize the overhead, we only store the datanodeUuid in this set */ - private final Set<String> warnedNodes = Collections.newSetFromMap( - new ConcurrentHashMap<String, Boolean>()); + private final Set<String> warnedNodes = + Collections.newSetFromMap(new ConcurrentHashMap<>()); DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ErasureCodingPolicy ecPolicy, @@ -183,8 +100,6 @@ public class DFSStripedInputStream extends DFSInputStream { groupSize = dataBlkNum + parityBlkNum; blockReaders = new BlockReaderInfo[groupSize]; curStripeRange = new StripeRange(0, 0); - readingService = - new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); ErasureCoderOptions coderOptions = new ErasureCoderOptions( dataBlkNum, parityBlkNum); decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(), @@ -198,7 +113,7 @@ public class DFSStripedInputStream extends DFSInputStream { return decoder.preferDirectBuffer(); } - private void resetCurStripeBuffer() { + void resetCurStripeBuffer() { if (curStripeBuf == null) { curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(), cellSize * dataBlkNum); @@ -207,7 +122,7 @@ public class DFSStripedInputStream extends DFSInputStream { curStripeRange = new StripeRange(0, 0); } - private ByteBuffer getParityBuffer() { + protected ByteBuffer getParityBuffer() { if (parityBuf == null) { parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(), cellSize * parityBlkNum); @@ -216,6 +131,29 @@ public class DFSStripedInputStream extends DFSInputStream { return parityBuf; } + protected ByteBuffer getCurStripeBuf() { + return curStripeBuf; + } + + protected String getSrc() { + return src; + } + + protected DFSClient getDFSClient() { + return dfsClient; + } + + protected LocatedBlocks getLocatedBlocks() { + return locatedBlocks; + } + + protected ByteBufferPool getBufferPool() { + return BUFFER_POOL; + } + + protected ThreadPoolExecutor getStripedReadsThreadPool(){ + return dfsClient.getStripedReadsThreadPool(); + } /** * When seeking into a new block group, create blockReader for each internal * block in the group. @@ -268,7 +206,7 @@ public class DFSStripedInputStream extends DFSInputStream { blockEnd = -1; } - private void closeReader(BlockReaderInfo readerInfo) { + protected void closeReader(BlockReaderInfo readerInfo) { if (readerInfo != null) { if (readerInfo.reader != null) { try { @@ -288,6 +226,59 @@ public class DFSStripedInputStream extends DFSInputStream { return pos - currentLocatedBlock.getStartOffset(); } + boolean createBlockReader(LocatedBlock block, long offsetInBlock, + LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos, + int chunkIndex) throws IOException { + BlockReader reader = null; + final ReaderRetryPolicy retry = new ReaderRetryPolicy(); + DFSInputStream.DNAddrPair dnInfo = + new DFSInputStream.DNAddrPair(null, null, null); + + while (true) { + try { + // the cached block location might have been re-fetched, so always + // get it from cache. + block = refreshLocatedBlock(block); + targetBlocks[chunkIndex] = block; + + // internal block has one location, just rule out the deadNodes + dnInfo = getBestNodeDNAddrPair(block, null); + if (dnInfo == null) { + break; + } + reader = getBlockReader(block, offsetInBlock, + block.getBlockSize() - offsetInBlock, + dnInfo.addr, dnInfo.storageType, dnInfo.info); + } catch (IOException e) { + if (e instanceof InvalidEncryptionKeyException && + retry.shouldRefetchEncryptionKey()) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + dnInfo.addr + + " : " + e); + dfsClient.clearDataEncryptionKey(); + retry.refetchEncryptionKey(); + } else if (retry.shouldRefetchToken() && + tokenRefetchNeeded(e, dnInfo.addr)) { + fetchBlockAt(block.getStartOffset()); + retry.refetchToken(); + } else { + //TODO: handles connection issues + DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " + + "block" + block.getBlock(), e); + // re-fetch the block in case the block has been moved + fetchBlockAt(block.getStartOffset()); + addToDeadNodes(dnInfo.info); + } + } + if (reader != null) { + readerInfos[chunkIndex] = + new BlockReaderInfo(reader, dnInfo.info, offsetInBlock); + return true; + } + } + return false; + } + /** * Read a new stripe covering the current position, and store the data in the * {@link #curStripeBuf}. @@ -303,20 +294,20 @@ public class DFSStripedInputStream extends DFSInputStream { final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen); final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize() - (stripeIndex * stripeLen), stripeLen); - StripeRange stripeRange = new StripeRange(offsetInBlockGroup, - stripeLimit - stripeBufOffset); + StripeRange stripeRange = + new StripeRange(offsetInBlockGroup, stripeLimit - stripeBufOffset); LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock; AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy, cellSize, blockGroup, offsetInBlockGroup, - offsetInBlockGroup + stripeRange.length - 1, curStripeBuf); + offsetInBlockGroup + stripeRange.getLength() - 1, curStripeBuf); final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( blockGroup, cellSize, dataBlkNum, parityBlkNum); // read the whole stripe for (AlignedStripe stripe : stripes) { // Parse group to get chosen DN location - StripeReader sreader = new StatefulStripeReader(readingService, stripe, - blks, blockReaders, corruptedBlocks); + StripeReader sreader = new StatefulStripeReader(stripe, ecPolicy, blks, + blockReaders, corruptedBlocks, decoder, this); sreader.readStripe(); } curStripeBuf.position(stripeBufOffset); @@ -324,69 +315,8 @@ public class DFSStripedInputStream extends DFSInputStream { curStripeRange = stripeRange; } - private Callable<Void> readCells(final BlockReader reader, - final DatanodeInfo datanode, final long currentReaderOffset, - final long targetReaderOffset, final ByteBufferStrategy[] strategies, - final ExtendedBlock currentBlock, - final CorruptedBlocks corruptedBlocks) { - return new Callable<Void>() { - @Override - public Void call() throws Exception { - // reader can be null if getBlockReaderWithRetry failed or - // the reader hit exception before - if (reader == null) { - throw new IOException("The BlockReader is null. " + - "The BlockReader creation failed or the reader hit exception."); - } - Preconditions.checkState(currentReaderOffset <= targetReaderOffset); - if (currentReaderOffset < targetReaderOffset) { - long skipped = reader.skip(targetReaderOffset - currentReaderOffset); - Preconditions.checkState( - skipped == targetReaderOffset - currentReaderOffset); - } - int result = 0; - for (ByteBufferStrategy strategy : strategies) { - result += readToBuffer(reader, datanode, strategy, currentBlock, - corruptedBlocks); - } - return null; - } - }; - } - - private int readToBuffer(BlockReader blockReader, - DatanodeInfo currentNode, ByteBufferStrategy strategy, - ExtendedBlock currentBlock, - CorruptedBlocks corruptedBlocks) - throws IOException { - final int targetLength = strategy.getTargetLength(); - int length = 0; - try { - while (length < targetLength) { - int ret = strategy.readFromBlock(blockReader); - if (ret < 0) { - throw new IOException("Unexpected EOS from the reader"); - } - length += ret; - } - return length; - } catch (ChecksumException ce) { - DFSClient.LOG.warn("Found Checksum error for " - + currentBlock + " from " + currentNode - + " at " + ce.getPos()); - // we want to remember which block replicas we have tried - corruptedBlocks.addCorruptedBlock(currentBlock, currentNode); - throw ce; - } catch (IOException e) { - DFSClient.LOG.warn("Exception while reading from " - + currentBlock + " of " + src + " from " - + currentNode, e); - throw e; - } - } - /** - * Seek to a new arbitrary location + * Seek to a new arbitrary location. */ @Override public synchronized void seek(long targetPos) throws IOException { @@ -469,7 +399,7 @@ public class DFSStripedInputStream extends DFSInputStream { } /** - * Copy the data from {@link #curStripeBuf} into the given buffer + * Copy the data from {@link #curStripeBuf} into the given buffer. * @param strategy the ReaderStrategy containing the given buffer * @param length target length * @return number of bytes copied @@ -530,17 +460,19 @@ public class DFSStripedInputStream extends DFSInputStream { AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes( ecPolicy, cellSize, blockGroup, start, end, buf); - CompletionService<Void> readService = new ExecutorCompletionService<>( - dfsClient.getStripedReadsThreadPool()); final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( blockGroup, cellSize, dataBlkNum, parityBlkNum); final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize]; try { for (AlignedStripe stripe : stripes) { // Parse group to get chosen DN location - StripeReader preader = new PositionStripeReader(readService, stripe, - blks, preaderInfos, corruptedBlocks); - preader.readStripe(); + StripeReader preader = new PositionStripeReader(stripe, ecPolicy, blks, + preaderInfos, corruptedBlocks, decoder, this); + try { + preader.readStripe(); + } finally { + preader.close(); + } } buf.position(buf.position() + (int)(end - start + 1)); } finally { @@ -571,376 +503,6 @@ public class DFSStripedInputStream extends DFSInputStream { } /** - * The reader for reading a complete {@link AlignedStripe}. Note that an - * {@link AlignedStripe} may cross multiple stripes with cellSize width. - */ - private abstract class StripeReader { - final Map<Future<Void>, Integer> futures = new HashMap<>(); - final AlignedStripe alignedStripe; - final CompletionService<Void> service; - final LocatedBlock[] targetBlocks; - final CorruptedBlocks corruptedBlocks; - final BlockReaderInfo[] readerInfos; - - StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe, - LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos, - CorruptedBlocks corruptedBlocks) { - this.service = service; - this.alignedStripe = alignedStripe; - this.targetBlocks = targetBlocks; - this.readerInfos = readerInfos; - this.corruptedBlocks = corruptedBlocks; - } - - /** prepare all the data chunks */ - abstract void prepareDecodeInputs(); - - /** prepare the parity chunk and block reader if necessary */ - abstract boolean prepareParityChunk(int index); - - abstract void decode(); - - void updateState4SuccessRead(StripingChunkReadResult result) { - Preconditions.checkArgument( - result.state == StripingChunkReadResult.SUCCESSFUL); - readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock() - + alignedStripe.getSpanInBlock()); - } - - private void checkMissingBlocks() throws IOException { - if (alignedStripe.missingChunksNum > parityBlkNum) { - clearFutures(futures.keySet()); - throw new IOException(alignedStripe.missingChunksNum - + " missing blocks, the stripe is: " + alignedStripe - + "; locatedBlocks is: " + locatedBlocks); - } - } - - /** - * We need decoding. Thus go through all the data chunks and make sure we - * submit read requests for all of them. - */ - private void readDataForDecoding() throws IOException { - prepareDecodeInputs(); - for (int i = 0; i < dataBlkNum; i++) { - Preconditions.checkNotNull(alignedStripe.chunks[i]); - if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) { - if (!readChunk(targetBlocks[i], i)) { - alignedStripe.missingChunksNum++; - } - } - } - checkMissingBlocks(); - } - - void readParityChunks(int num) throws IOException { - for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num; - i++) { - if (alignedStripe.chunks[i] == null) { - if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) { - j++; - } else { - alignedStripe.missingChunksNum++; - } - } - } - checkMissingBlocks(); - } - - boolean createBlockReader(LocatedBlock block, int chunkIndex) - throws IOException { - BlockReader reader = null; - final ReaderRetryPolicy retry = new ReaderRetryPolicy(); - DNAddrPair dnInfo = new DNAddrPair(null, null, null); - - while(true) { - try { - // the cached block location might have been re-fetched, so always - // get it from cache. - block = refreshLocatedBlock(block); - targetBlocks[chunkIndex] = block; - - // internal block has one location, just rule out the deadNodes - dnInfo = getBestNodeDNAddrPair(block, null); - if (dnInfo == null) { - break; - } - reader = getBlockReader(block, alignedStripe.getOffsetInBlock(), - block.getBlockSize() - alignedStripe.getOffsetInBlock(), - dnInfo.addr, dnInfo.storageType, dnInfo.info); - } catch (IOException e) { - if (e instanceof InvalidEncryptionKeyException && - retry.shouldRefetchEncryptionKey()) { - DFSClient.LOG.info("Will fetch a new encryption key and retry, " - + "encryption key was invalid when connecting to " + dnInfo.addr - + " : " + e); - dfsClient.clearDataEncryptionKey(); - retry.refetchEncryptionKey(); - } else if (retry.shouldRefetchToken() && - tokenRefetchNeeded(e, dnInfo.addr)) { - fetchBlockAt(block.getStartOffset()); - retry.refetchToken(); - } else { - //TODO: handles connection issues - DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " + - "block" + block.getBlock(), e); - // re-fetch the block in case the block has been moved - fetchBlockAt(block.getStartOffset()); - addToDeadNodes(dnInfo.info); - } - } - if (reader != null) { - readerInfos[chunkIndex] = new BlockReaderInfo(reader, dnInfo.info, - alignedStripe.getOffsetInBlock()); - return true; - } - } - return false; - } - - private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) { - if (chunk.useByteBuffer()) { - ByteBufferStrategy strategy = new ByteBufferStrategy( - chunk.getByteBuffer(), readStatistics, dfsClient); - return new ByteBufferStrategy[]{strategy}; - } else { - ByteBufferStrategy[] strategies = - new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()]; - for (int i = 0; i < strategies.length; i++) { - ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i); - strategies[i] = - new ByteBufferStrategy(buffer, readStatistics, dfsClient); - } - return strategies; - } - } - - boolean readChunk(final LocatedBlock block, int chunkIndex) - throws IOException { - final StripingChunk chunk = alignedStripe.chunks[chunkIndex]; - if (block == null) { - chunk.state = StripingChunk.MISSING; - return false; - } - if (readerInfos[chunkIndex] == null) { - if (!createBlockReader(block, chunkIndex)) { - chunk.state = StripingChunk.MISSING; - return false; - } - } else if (readerInfos[chunkIndex].shouldSkip) { - chunk.state = StripingChunk.MISSING; - return false; - } - - chunk.state = StripingChunk.PENDING; - Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader, - readerInfos[chunkIndex].datanode, - readerInfos[chunkIndex].blockReaderOffset, - alignedStripe.getOffsetInBlock(), getReadStrategies(chunk), - block.getBlock(), corruptedBlocks); - - Future<Void> request = service.submit(readCallable); - futures.put(request, chunkIndex); - return true; - } - - /** read the whole stripe. do decoding if necessary */ - void readStripe() throws IOException { - for (int i = 0; i < dataBlkNum; i++) { - if (alignedStripe.chunks[i] != null && - alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { - if (!readChunk(targetBlocks[i], i)) { - alignedStripe.missingChunksNum++; - } - } - } - // There are missing block locations at this stage. Thus we need to read - // the full stripe and one more parity block. - if (alignedStripe.missingChunksNum > 0) { - checkMissingBlocks(); - readDataForDecoding(); - // read parity chunks - readParityChunks(alignedStripe.missingChunksNum); - } - // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks - - // Input buffers for potential decode operation, which remains null until - // first read failure - while (!futures.isEmpty()) { - try { - StripingChunkReadResult r = StripedBlockUtil - .getNextCompletedStripedRead(service, futures, 0); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " - + alignedStripe); - } - StripingChunk returnedChunk = alignedStripe.chunks[r.index]; - Preconditions.checkNotNull(returnedChunk); - Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING); - - if (r.state == StripingChunkReadResult.SUCCESSFUL) { - returnedChunk.state = StripingChunk.FETCHED; - alignedStripe.fetchedChunksNum++; - updateState4SuccessRead(r); - if (alignedStripe.fetchedChunksNum == dataBlkNum) { - clearFutures(futures.keySet()); - break; - } - } else { - returnedChunk.state = StripingChunk.MISSING; - // close the corresponding reader - closeReader(readerInfos[r.index]); - - final int missing = alignedStripe.missingChunksNum; - alignedStripe.missingChunksNum++; - checkMissingBlocks(); - - readDataForDecoding(); - readParityChunks(alignedStripe.missingChunksNum - missing); - } - } catch (InterruptedException ie) { - String err = "Read request interrupted"; - DFSClient.LOG.error(err); - clearFutures(futures.keySet()); - // Don't decode if read interrupted - throw new InterruptedIOException(err); - } - } - - if (alignedStripe.missingChunksNum > 0) { - decode(); - } - } - } - - class PositionStripeReader extends StripeReader { - private ByteBuffer[] decodeInputs = null; - - PositionStripeReader(CompletionService<Void> service, - AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, - BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) { - super(service, alignedStripe, targetBlocks, readerInfos, - corruptedBlocks); - } - - @Override - void prepareDecodeInputs() { - if (decodeInputs == null) { - decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe, - dataBlkNum, parityBlkNum); - } - } - - @Override - boolean prepareParityChunk(int index) { - Preconditions.checkState(index >= dataBlkNum && - alignedStripe.chunks[index] == null); - alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]); - return true; - } - - @Override - void decode() { - StripedBlockUtil.finalizeDecodeInputs(decodeInputs, alignedStripe); - StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe, - dataBlkNum, parityBlkNum, decoder); - } - } - - class StatefulStripeReader extends StripeReader { - ByteBuffer[] decodeInputs; - - StatefulStripeReader(CompletionService<Void> service, - AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, - BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) { - super(service, alignedStripe, targetBlocks, readerInfos, - corruptedBlocks); - } - - @Override - void prepareDecodeInputs() { - if (decodeInputs == null) { - decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum]; - final ByteBuffer cur; - synchronized (DFSStripedInputStream.this) { - cur = curStripeBuf.duplicate(); - } - StripedBlockUtil.VerticalRange range = alignedStripe.range; - for (int i = 0; i < dataBlkNum; i++) { - cur.limit(cur.capacity()); - int pos = (int) (range.offsetInBlock % cellSize + cellSize * i); - cur.position(pos); - cur.limit((int) (pos + range.spanInBlock)); - decodeInputs[i] = cur.slice(); - if (alignedStripe.chunks[i] == null) { - alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]); - } - } - } - } - - @Override - boolean prepareParityChunk(int index) { - Preconditions.checkState(index >= dataBlkNum - && alignedStripe.chunks[index] == null); - if (blockReaders[index] != null && blockReaders[index].shouldSkip) { - alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING); - // we have failed the block reader before - return false; - } - final int parityIndex = index - dataBlkNum; - ByteBuffer buf = getParityBuffer().duplicate(); - buf.position(cellSize * parityIndex); - buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock); - decodeInputs[index] = buf.slice(); - alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]); - return true; - } - - @Override - void decode() { - final int span = (int) alignedStripe.getSpanInBlock(); - for (int i = 0; i < alignedStripe.chunks.length; i++) { - if (alignedStripe.chunks[i] != null && - alignedStripe.chunks[i].state == StripingChunk.ALLZERO) { - for (int j = 0; j < span; j++) { - decodeInputs[i].put((byte) 0); - } - decodeInputs[i].flip(); - } else if (alignedStripe.chunks[i] != null && - alignedStripe.chunks[i].state == StripingChunk.FETCHED) { - decodeInputs[i].position(0); - decodeInputs[i].limit(span); - } - } - int[] decodeIndices = new int[parityBlkNum]; - int pos = 0; - for (int i = 0; i < alignedStripe.chunks.length; i++) { - if (alignedStripe.chunks[i] != null && - alignedStripe.chunks[i].state == StripingChunk.MISSING) { - if (i < dataBlkNum) { - decodeIndices[pos++] = i; - } else { - decodeInputs[i] = null; - } - } - } - decodeIndices = Arrays.copyOf(decodeIndices, pos); - - final int decodeChunkNum = decodeIndices.length; - ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum]; - for (int i = 0; i < decodeChunkNum; i++) { - outputs[i] = decodeInputs[decodeIndices[i]]; - outputs[i].position(0); - outputs[i].limit((int) alignedStripe.range.spanInBlock); - decodeInputs[decodeIndices[i]] = null; - } - - decoder.decode(decodeInputs, decodeIndices, outputs); - } - } - - /** * May need online read recovery, zero-copy read doesn't make * sense, so don't support it. */ @@ -957,12 +519,4 @@ public class DFSStripedInputStream extends DFSInputStream { throw new UnsupportedOperationException( "Not support enhanced byte buffer access."); } - - /** A variation to {@link DFSInputStream#cancelAll} */ - private void clearFutures(Collection<Future<Void>> futures) { - for (Future<Void> future : futures) { - future.cancel(false); - } - futures.clear(); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java new file mode 100644 index 0000000..5818291 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import com.google.common.base.Preconditions; +import org.apache.commons.configuration.SystemConfiguration; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; +import org.apache.hadoop.io.erasurecode.ECChunk; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; + +import java.nio.ByteBuffer; + +/** + * The reader for reading a complete {@link StripedBlockUtil.AlignedStripe} + * which may cross multiple stripes with cellSize width. + */ +class PositionStripeReader extends StripeReader { + private ByteBuffer codingBuffer; + + PositionStripeReader(AlignedStripe alignedStripe, + ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks, + BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks, + RawErasureDecoder decoder, DFSStripedInputStream dfsStripedInputStream) { + super(alignedStripe, ecPolicy, targetBlocks, readerInfos, + corruptedBlocks, decoder, dfsStripedInputStream); + } + + @Override + void prepareDecodeInputs() { + if (codingBuffer == null) { + this.decodeInputs = new ECChunk[dataBlkNum + parityBlkNum]; + initDecodeInputs(alignedStripe); + } + } + + @Override + boolean prepareParityChunk(int index) { + Preconditions.checkState(index >= dataBlkNum && + alignedStripe.chunks[index] == null); + + alignedStripe.chunks[index] = + new StripingChunk(decodeInputs[index].getBuffer()); + + return true; + } + + @Override + void decode() { + finalizeDecodeInputs(); + decodeAndFillBuffer(true); + } + + void initDecodeInputs(AlignedStripe alignedStripe) { + int bufLen = (int) alignedStripe.getSpanInBlock(); + int bufCount = dataBlkNum + parityBlkNum; + codingBuffer = dfsStripedInputStream.getBufferPool(). + getBuffer(useDirectBuffer(), bufLen * bufCount); + ByteBuffer buffer; + for (int i = 0; i < decodeInputs.length; i++) { + buffer = codingBuffer.duplicate(); + decodeInputs[i] = new ECChunk(buffer, i * bufLen, bufLen); + } + + for (int i = 0; i < dataBlkNum; i++) { + if (alignedStripe.chunks[i] == null) { + alignedStripe.chunks[i] = + new StripingChunk(decodeInputs[i].getBuffer()); + } + } + } + + void close() { + if (decodeInputs != null) { + for (int i = 0; i < decodeInputs.length; i++) { + decodeInputs[i] = null; + } + } + + if (codingBuffer != null) { + dfsStripedInputStream.getBufferPool().putBuffer(codingBuffer); + codingBuffer = null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java new file mode 100644 index 0000000..8879514 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; +import org.apache.hadoop.io.erasurecode.ECChunk; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; + +import java.nio.ByteBuffer; + +/** + * The reader for reading a complete {@link StripedBlockUtil.AlignedStripe} + * which belongs to a single stripe. + * Reading cross multiple strips is not supported in this reader. + */ +class StatefulStripeReader extends StripeReader { + + StatefulStripeReader(AlignedStripe alignedStripe, + ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks, + BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks, + RawErasureDecoder decoder, DFSStripedInputStream dfsStripedInputStream) { + super(alignedStripe, ecPolicy, targetBlocks, readerInfos, + corruptedBlocks, decoder, dfsStripedInputStream); + } + + @Override + void prepareDecodeInputs() { + final ByteBuffer cur; + synchronized (dfsStripedInputStream) { + cur = dfsStripedInputStream.getCurStripeBuf().duplicate(); + } + + this.decodeInputs = new ECChunk[dataBlkNum + parityBlkNum]; + int bufLen = (int) alignedStripe.getSpanInBlock(); + int bufOff = (int) alignedStripe.getOffsetInBlock(); + for (int i = 0; i < dataBlkNum; i++) { + cur.limit(cur.capacity()); + int pos = bufOff % cellSize + cellSize * i; + cur.position(pos); + cur.limit(pos + bufLen); + decodeInputs[i] = new ECChunk(cur.slice(), 0, bufLen); + if (alignedStripe.chunks[i] == null) { + alignedStripe.chunks[i] = + new StripingChunk(decodeInputs[i].getBuffer()); + } + } + } + + @Override + boolean prepareParityChunk(int index) { + Preconditions.checkState(index >= dataBlkNum + && alignedStripe.chunks[index] == null); + if (readerInfos[index] != null && readerInfos[index].shouldSkip) { + alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING); + // we have failed the block reader before + return false; + } + final int parityIndex = index - dataBlkNum; + ByteBuffer buf = dfsStripedInputStream.getParityBuffer().duplicate(); + buf.position(cellSize * parityIndex); + buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock); + decodeInputs[index] = + new ECChunk(buf.slice(), 0, (int) alignedStripe.range.spanInBlock); + alignedStripe.chunks[index] = + new StripingChunk(decodeInputs[index].getBuffer()); + return true; + } + + @Override + void decode() { + finalizeDecodeInputs(); + decodeAndFillBuffer(false); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java new file mode 100644 index 0000000..5518752 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; +import org.apache.hadoop.io.erasurecode.ECChunk; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; + +/** + * The reader for reading a complete {@link StripedBlockUtil.AlignedStripe}. + * Note that an {@link StripedBlockUtil.AlignedStripe} may cross multiple + * stripes with cellSize width. + */ +abstract class StripeReader { + + static class ReaderRetryPolicy { + private int fetchEncryptionKeyTimes = 1; + private int fetchTokenTimes = 1; + + void refetchEncryptionKey() { + fetchEncryptionKeyTimes--; + } + + void refetchToken() { + fetchTokenTimes--; + } + + boolean shouldRefetchEncryptionKey() { + return fetchEncryptionKeyTimes > 0; + } + + boolean shouldRefetchToken() { + return fetchTokenTimes > 0; + } + } + + static class BlockReaderInfo { + final BlockReader reader; + final DatanodeInfo datanode; + /** + * when initializing block readers, their starting offsets are set to the + * same number: the smallest internal block offsets among all the readers. + * This is because it is possible that for some internal blocks we have to + * read "backwards" for decoding purpose. We thus use this offset array to + * track offsets for all the block readers so that we can skip data if + * necessary. + */ + long blockReaderOffset; + /** + * We use this field to indicate whether we should use this reader. In case + * we hit any issue with this reader, we set this field to true and avoid + * using it for the next stripe. + */ + boolean shouldSkip = false; + + BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) { + this.reader = reader; + this.datanode = dn; + this.blockReaderOffset = offset; + } + + void setOffset(long offset) { + this.blockReaderOffset = offset; + } + + void skip() { + this.shouldSkip = true; + } + } + + protected final Map<Future<Void>, Integer> futures = new HashMap<>(); + protected final AlignedStripe alignedStripe; + protected final CompletionService<Void> service; + protected final LocatedBlock[] targetBlocks; + protected final CorruptedBlocks corruptedBlocks; + protected final BlockReaderInfo[] readerInfos; + protected final ErasureCodingPolicy ecPolicy; + protected final short dataBlkNum; + protected final short parityBlkNum; + protected final int cellSize; + protected final RawErasureDecoder decoder; + protected final DFSStripedInputStream dfsStripedInputStream; + + protected ECChunk[] decodeInputs; + + StripeReader(AlignedStripe alignedStripe, + ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks, + BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks, + RawErasureDecoder decoder, + DFSStripedInputStream dfsStripedInputStream) { + this.alignedStripe = alignedStripe; + this.ecPolicy = ecPolicy; + this.dataBlkNum = (short)ecPolicy.getNumDataUnits(); + this.parityBlkNum = (short)ecPolicy.getNumParityUnits(); + this.cellSize = ecPolicy.getCellSize(); + this.targetBlocks = targetBlocks; + this.readerInfos = readerInfos; + this.corruptedBlocks = corruptedBlocks; + this.decoder = decoder; + this.dfsStripedInputStream = dfsStripedInputStream; + + service = new ExecutorCompletionService<>( + dfsStripedInputStream.getStripedReadsThreadPool()); + } + + /** + * Prepare all the data chunks. + */ + abstract void prepareDecodeInputs(); + + /** + * Prepare the parity chunk and block reader if necessary. + */ + abstract boolean prepareParityChunk(int index); + + /* + * Decode to get the missing data. + */ + abstract void decode(); + + /* + * Default close do nothing. + */ + void close() { + } + + void updateState4SuccessRead(StripingChunkReadResult result) { + Preconditions.checkArgument( + result.state == StripingChunkReadResult.SUCCESSFUL); + readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock() + + alignedStripe.getSpanInBlock()); + } + + private void checkMissingBlocks() throws IOException { + if (alignedStripe.missingChunksNum > parityBlkNum) { + clearFutures(); + throw new IOException(alignedStripe.missingChunksNum + + " missing blocks, the stripe is: " + alignedStripe + + "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks()); + } + } + + /** + * We need decoding. Thus go through all the data chunks and make sure we + * submit read requests for all of them. + */ + private void readDataForDecoding() throws IOException { + prepareDecodeInputs(); + for (int i = 0; i < dataBlkNum; i++) { + Preconditions.checkNotNull(alignedStripe.chunks[i]); + if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) { + if (!readChunk(targetBlocks[i], i)) { + alignedStripe.missingChunksNum++; + } + } + } + checkMissingBlocks(); + } + + void readParityChunks(int num) throws IOException { + for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num; + i++) { + if (alignedStripe.chunks[i] == null) { + if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) { + j++; + } else { + alignedStripe.missingChunksNum++; + } + } + } + checkMissingBlocks(); + } + + private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) { + if (chunk.useByteBuffer()) { + ByteBufferStrategy strategy = new ByteBufferStrategy( + chunk.getByteBuffer(), dfsStripedInputStream.getReadStatistics(), + dfsStripedInputStream.getDFSClient()); + return new ByteBufferStrategy[]{strategy}; + } + + ByteBufferStrategy[] strategies = + new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()]; + for (int i = 0; i < strategies.length; i++) { + ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i); + strategies[i] = new ByteBufferStrategy(buffer, + dfsStripedInputStream.getReadStatistics(), + dfsStripedInputStream.getDFSClient()); + } + return strategies; + } + + private int readToBuffer(BlockReader blockReader, + DatanodeInfo currentNode, ByteBufferStrategy strategy, + ExtendedBlock currentBlock) throws IOException { + final int targetLength = strategy.getTargetLength(); + int length = 0; + try { + while (length < targetLength) { + int ret = strategy.readFromBlock(blockReader); + if (ret < 0) { + throw new IOException("Unexpected EOS from the reader"); + } + length += ret; + } + return length; + } catch (ChecksumException ce) { + DFSClient.LOG.warn("Found Checksum error for " + + currentBlock + " from " + currentNode + + " at " + ce.getPos()); + // we want to remember which block replicas we have tried + corruptedBlocks.addCorruptedBlock(currentBlock, currentNode); + throw ce; + } catch (IOException e) { + DFSClient.LOG.warn("Exception while reading from " + + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from " + + currentNode, e); + throw e; + } + } + + private Callable<Void> readCells(final BlockReader reader, + final DatanodeInfo datanode, final long currentReaderOffset, + final long targetReaderOffset, final ByteBufferStrategy[] strategies, + final ExtendedBlock currentBlock) { + return () -> { + // reader can be null if getBlockReaderWithRetry failed or + // the reader hit exception before + if (reader == null) { + throw new IOException("The BlockReader is null. " + + "The BlockReader creation failed or the reader hit exception."); + } + Preconditions.checkState(currentReaderOffset <= targetReaderOffset); + if (currentReaderOffset < targetReaderOffset) { + long skipped = reader.skip(targetReaderOffset - currentReaderOffset); + Preconditions.checkState( + skipped == targetReaderOffset - currentReaderOffset); + } + + for (ByteBufferStrategy strategy : strategies) { + readToBuffer(reader, datanode, strategy, currentBlock); + } + return null; + }; + } + + boolean readChunk(final LocatedBlock block, int chunkIndex) + throws IOException { + final StripingChunk chunk = alignedStripe.chunks[chunkIndex]; + if (block == null) { + chunk.state = StripingChunk.MISSING; + return false; + } + + if (readerInfos[chunkIndex] == null) { + if (!dfsStripedInputStream.createBlockReader(block, + alignedStripe.getOffsetInBlock(), targetBlocks, + readerInfos, chunkIndex)) { + chunk.state = StripingChunk.MISSING; + return false; + } + } else if (readerInfos[chunkIndex].shouldSkip) { + chunk.state = StripingChunk.MISSING; + return false; + } + + chunk.state = StripingChunk.PENDING; + Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader, + readerInfos[chunkIndex].datanode, + readerInfos[chunkIndex].blockReaderOffset, + alignedStripe.getOffsetInBlock(), getReadStrategies(chunk), + block.getBlock()); + + Future<Void> request = service.submit(readCallable); + futures.put(request, chunkIndex); + return true; + } + + /** + * read the whole stripe. do decoding if necessary + */ + void readStripe() throws IOException { + for (int i = 0; i < dataBlkNum; i++) { + if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { + if (!readChunk(targetBlocks[i], i)) { + alignedStripe.missingChunksNum++; + } + } + } + // There are missing block locations at this stage. Thus we need to read + // the full stripe and one more parity block. + if (alignedStripe.missingChunksNum > 0) { + checkMissingBlocks(); + readDataForDecoding(); + // read parity chunks + readParityChunks(alignedStripe.missingChunksNum); + } + // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks + + // Input buffers for potential decode operation, which remains null until + // first read failure + while (!futures.isEmpty()) { + try { + StripingChunkReadResult r = StripedBlockUtil + .getNextCompletedStripedRead(service, futures, 0); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + + alignedStripe); + } + StripingChunk returnedChunk = alignedStripe.chunks[r.index]; + Preconditions.checkNotNull(returnedChunk); + Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING); + + if (r.state == StripingChunkReadResult.SUCCESSFUL) { + returnedChunk.state = StripingChunk.FETCHED; + alignedStripe.fetchedChunksNum++; + updateState4SuccessRead(r); + if (alignedStripe.fetchedChunksNum == dataBlkNum) { + clearFutures(); + break; + } + } else { + returnedChunk.state = StripingChunk.MISSING; + // close the corresponding reader + dfsStripedInputStream.closeReader(readerInfos[r.index]); + + final int missing = alignedStripe.missingChunksNum; + alignedStripe.missingChunksNum++; + checkMissingBlocks(); + + readDataForDecoding(); + readParityChunks(alignedStripe.missingChunksNum - missing); + } + } catch (InterruptedException ie) { + String err = "Read request interrupted"; + DFSClient.LOG.error(err); + clearFutures(); + // Don't decode if read interrupted + throw new InterruptedIOException(err); + } + } + + if (alignedStripe.missingChunksNum > 0) { + decode(); + } + } + + /** + * Some fetched {@link StripingChunk} might be stored in original application + * buffer instead of prepared decode input buffers. Some others are beyond + * the range of the internal blocks and should correspond to all zero bytes. + * When all pending requests have returned, this method should be called to + * finalize decode input buffers. + */ + + void finalizeDecodeInputs() { + for (int i = 0; i < alignedStripe.chunks.length; i++) { + final StripingChunk chunk = alignedStripe.chunks[i]; + if (chunk != null && chunk.state == StripingChunk.FETCHED) { + if (chunk.useChunkBuffer()) { + chunk.getChunkBuffer().copyTo(decodeInputs[i].getBuffer()); + } else { + chunk.getByteBuffer().flip(); + } + } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) { + decodeInputs[i].setAllZero(true); + } + } + } + + /** + * Decode based on the given input buffers and erasure coding policy. + */ + void decodeAndFillBuffer(boolean fillBuffer) { + // Step 1: prepare indices and output buffers for missing data units + int[] decodeIndices = prepareErasedIndices(); + + final int decodeChunkNum = decodeIndices.length; + ECChunk[] outputs = new ECChunk[decodeChunkNum]; + for (int i = 0; i < decodeChunkNum; i++) { + outputs[i] = decodeInputs[decodeIndices[i]]; + decodeInputs[decodeIndices[i]] = null; + } + // Step 2: decode into prepared output buffers + decoder.decode(decodeInputs, decodeIndices, outputs); + + // Step 3: fill original application buffer with decoded data + if (fillBuffer) { + for (int i = 0; i < decodeIndices.length; i++) { + int missingBlkIdx = decodeIndices[i]; + StripingChunk chunk = alignedStripe.chunks[missingBlkIdx]; + if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) { + chunk.getChunkBuffer().copyFrom(outputs[i].getBuffer()); + } + } + } + } + + /** + * Prepare erased indices. + */ + int[] prepareErasedIndices() { + int[] decodeIndices = new int[parityBlkNum]; + int pos = 0; + for (int i = 0; i < alignedStripe.chunks.length; i++) { + if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state == StripingChunk.MISSING){ + decodeIndices[pos++] = i; + } + } + + int[] erasedIndices = Arrays.copyOf(decodeIndices, pos); + return erasedIndices; + } + + void clearFutures() { + for (Future<Void> future : futures.keySet()) { + future.cancel(false); + } + futures.clear(); + } + + boolean useDirectBuffer() { + return decoder.preferDirectBuffer(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 4dbbc3d..896ebc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.DFSStripedOutputStream; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -32,7 +31,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; -import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.hdfs.DFSStripedOutputStream; import org.apache.hadoop.security.token.Token; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,18 +76,6 @@ public class StripedBlockUtil { LoggerFactory.getLogger(StripedBlockUtil.class); /** - * Parses a striped block group into individual blocks. - * @param bg The striped block group - * @param ecPolicy The erasure coding policy - * @return An array of the blocks in the group - */ - public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg, - ErasureCodingPolicy ecPolicy) { - return parseStripedBlockGroup(bg, ecPolicy.getCellSize(), - ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()); - } - - /** * This method parses a striped block group into individual blocks. * * @param bg The striped block group @@ -112,7 +99,7 @@ public class StripedBlockUtil { } /** - * This method creates an internal block at the given index of a block group + * This method creates an internal block at the given index of a block group. * * @param idxInReturnedLocs The index in the stored locations in the * {@link LocatedStripedBlock} object @@ -169,7 +156,7 @@ public class StripedBlockUtil { } /** - * Get the size of an internal block at the given index of a block group + * Get the size of an internal block at the given index of a block group. * * @param dataSize Size of the block group only counting data blocks * @param cellSize The size of a striping cell @@ -237,7 +224,7 @@ public class StripedBlockUtil { /** * Given a byte's offset in an internal block, calculate the offset in - * the block group + * the block group. */ public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum, long offsetInBlk, int idxInBlockGroup) { @@ -248,12 +235,12 @@ public class StripedBlockUtil { } /** - * Get the next completed striped read task + * Get the next completed striped read task. * - * @return {@link StripingChunkReadResult} indicating the status of the read task - * succeeded, and the block index of the task. If the method times - * out without getting any completed read tasks, -1 is returned as - * block index. + * @return {@link StripingChunkReadResult} indicating the status of the read + * task succeeded, and the block index of the task. If the method + * times out without getting any completed read tasks, -1 is + * returned as block index. * @throws InterruptedException */ public static StripingChunkReadResult getNextCompletedStripedRead( @@ -287,7 +274,7 @@ public class StripedBlockUtil { /** * Get the total usage of the striped blocks, which is the total of data - * blocks and parity blocks + * blocks and parity blocks. * * @param numDataBlkBytes * Size of the block group only counting data blocks @@ -308,91 +295,6 @@ public class StripedBlockUtil { } /** - * Initialize the decoding input buffers based on the chunk states in an - * {@link AlignedStripe}. For each chunk that was not initially requested, - * schedule a new fetch request with the decoding input buffer as transfer - * destination. - */ - public static ByteBuffer[] initDecodeInputs(AlignedStripe alignedStripe, - int dataBlkNum, int parityBlkNum) { - ByteBuffer[] decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum]; - for (int i = 0; i < decodeInputs.length; i++) { - decodeInputs[i] = ByteBuffer.allocate( - (int) alignedStripe.getSpanInBlock()); - } - // read the full data aligned stripe - for (int i = 0; i < dataBlkNum; i++) { - if (alignedStripe.chunks[i] == null) { - alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]); - } - } - return decodeInputs; - } - - /** - * Some fetched {@link StripingChunk} might be stored in original application - * buffer instead of prepared decode input buffers. Some others are beyond - * the range of the internal blocks and should correspond to all zero bytes. - * When all pending requests have returned, this method should be called to - * finalize decode input buffers. - */ - public static void finalizeDecodeInputs(final ByteBuffer[] decodeInputs, - AlignedStripe alignedStripe) { - for (int i = 0; i < alignedStripe.chunks.length; i++) { - final StripingChunk chunk = alignedStripe.chunks[i]; - if (chunk != null && chunk.state == StripingChunk.FETCHED) { - if (chunk.useChunkBuffer()) { - chunk.getChunkBuffer().copyTo(decodeInputs[i]); - } else { - chunk.getByteBuffer().flip(); - } - } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) { - //ZERO it. Will be better handled in other following issue. - byte[] emptyBytes = new byte[decodeInputs[i].limit()]; - decodeInputs[i].put(emptyBytes); - decodeInputs[i].flip(); - } else { - decodeInputs[i] = null; - } - } - } - - /** - * Decode based on the given input buffers and erasure coding policy. - */ - public static void decodeAndFillBuffer(final ByteBuffer[] decodeInputs, - AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum, - RawErasureDecoder decoder) { - // Step 1: prepare indices and output buffers for missing data units - int[] decodeIndices = new int[parityBlkNum]; - int pos = 0; - for (int i = 0; i < dataBlkNum; i++) { - if (alignedStripe.chunks[i] != null && - alignedStripe.chunks[i].state == StripingChunk.MISSING){ - decodeIndices[pos++] = i; - } - } - decodeIndices = Arrays.copyOf(decodeIndices, pos); - ByteBuffer[] decodeOutputs = new ByteBuffer[decodeIndices.length]; - for (int i = 0; i < decodeOutputs.length; i++) { - decodeOutputs[i] = ByteBuffer.allocate( - (int) alignedStripe.getSpanInBlock()); - } - - // Step 2: decode into prepared output buffers - decoder.decode(decodeInputs, decodeIndices, decodeOutputs); - - // Step 3: fill original application buffer with decoded data - for (int i = 0; i < decodeIndices.length; i++) { - int missingBlkIdx = decodeIndices[i]; - StripingChunk chunk = alignedStripe.chunks[missingBlkIdx]; - if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) { - chunk.getChunkBuffer().copyFrom(decodeOutputs[i]); - } - } - } - - /** * Similar functionality with {@link #divideByteRangeIntoStripes}, but is used * by stateful read and uses ByteBuffer as reading target buffer. Besides the * read range is within a single stripe thus the calculation logic is simpler. @@ -485,7 +387,7 @@ public class StripedBlockUtil { /** * Map the logical byte range to a set of inclusive {@link StripingCell} * instances, each representing the overlap of the byte range to a cell - * used by {@link DFSStripedOutputStream} in encoding + * used by {@link DFSStripedOutputStream} in encoding. */ @VisibleForTesting private static StripingCell[] getStripingCellsOfByteRange( @@ -530,7 +432,7 @@ public class StripedBlockUtil { int dataBlkNum = ecPolicy.getNumDataUnits(); int parityBlkNum = ecPolicy.getNumParityUnits(); - VerticalRange ranges[] = new VerticalRange[dataBlkNum + parityBlkNum]; + VerticalRange[] ranges = new VerticalRange[dataBlkNum + parityBlkNum]; long earliestStart = Long.MAX_VALUE; long latestEnd = -1; @@ -675,7 +577,7 @@ public class StripedBlockUtil { @VisibleForTesting static class StripingCell { final ErasureCodingPolicy ecPolicy; - /** Logical order in a block group, used when doing I/O to a block group */ + /** Logical order in a block group, used when doing I/O to a block group. */ final int idxInBlkGroup; final int idxInInternalBlk; final int idxInStripe; @@ -738,7 +640,7 @@ public class StripedBlockUtil { */ public static class AlignedStripe { public VerticalRange range; - /** status of each chunk in the stripe */ + /** status of each chunk in the stripe. */ public final StripingChunk[] chunks; public int fetchedChunksNum = 0; public int missingChunksNum = 0; @@ -790,9 +692,9 @@ public class StripedBlockUtil { * +-----+ */ public static class VerticalRange { - /** start offset in the block group (inclusive) */ + /** start offset in the block group (inclusive). */ public long offsetInBlock; - /** length of the stripe range */ + /** length of the stripe range. */ public long spanInBlock; public VerticalRange(long offsetInBlock, long length) { @@ -801,7 +703,7 @@ public class StripedBlockUtil { this.spanInBlock = length; } - /** whether a position is in the range */ + /** whether a position is in the range. */ public boolean include(long pos) { return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock; } @@ -915,7 +817,7 @@ public class StripedBlockUtil { /** * Note: target will be ready-to-read state after the call. */ - void copyTo(ByteBuffer target) { + public void copyTo(ByteBuffer target) { for (ByteBuffer slice : slices) { slice.flip(); target.put(slice); @@ -923,7 +825,7 @@ public class StripedBlockUtil { target.flip(); } - void copyFrom(ByteBuffer src) { + public void copyFrom(ByteBuffer src) { ByteBuffer tmp; int len; for (ByteBuffer slice : slices) { @@ -970,6 +872,28 @@ public class StripedBlockUtil { } } + /** Used to indicate the buffered data's range in the block group. */ + public static class StripeRange { + /** start offset in the block group (inclusive). */ + final long offsetInBlock; + /** length of the stripe range. */ + final long length; + + public StripeRange(long offsetInBlock, long length) { + Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0); + this.offsetInBlock = offsetInBlock; + this.length = length; + } + + public boolean include(long pos) { + return pos >= offsetInBlock && pos < offsetInBlock + length; + } + + public long getLength() { + return length; + } + } + /** * Check if the information such as IDs and generation stamps in block-i * match the block group. http://git-wip-us.apache.org/repos/asf/hadoop/blob/734d54c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java index 7d9d7dc..999eb1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java @@ -283,5 +283,4 @@ public class TestStripedBlockUtil { } } } - } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org