http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java new file mode 100644 index 0000000..3612063 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -0,0 +1,939 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.ByteBufferPool; + +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; + +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ECSchema; + +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.util.DirectBufferPool; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Set; +import java.util.Collection; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +/** + * DFSStripedInputStream reads from striped block groups + */ +public class DFSStripedInputStream extends DFSInputStream { + + private static class ReaderRetryPolicy { + private int fetchEncryptionKeyTimes = 1; + private int fetchTokenTimes = 1; + + void refetchEncryptionKey() { + fetchEncryptionKeyTimes--; + } + + void refetchToken() { + fetchTokenTimes--; + } + + boolean shouldRefetchEncryptionKey() { + return fetchEncryptionKeyTimes > 0; + } + + boolean shouldRefetchToken() { + return fetchTokenTimes > 0; + } + } + + /** Used to indicate the buffered data's range in the block group */ + private static class StripeRange { + /** start offset in the block group (inclusive) */ + final long offsetInBlock; + /** length of the stripe range */ + final long length; + + StripeRange(long offsetInBlock, long length) { + Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0); + this.offsetInBlock = offsetInBlock; + this.length = length; + } + + boolean include(long pos) { + return pos >= offsetInBlock && pos < offsetInBlock + length; + } + } + + private static class BlockReaderInfo { + final BlockReader reader; + final DatanodeInfo datanode; + /** + * when initializing block readers, their starting offsets are set to the same + * number: the smallest internal block offsets among all the readers. This is + * because it is possible that for some internal blocks we have to read + * "backwards" for decoding purpose. We thus use this offset array to track + * offsets for all the block readers so that we can skip data if necessary. + */ + long blockReaderOffset; + LocatedBlock targetBlock; + /** + * We use this field to indicate whether we should use this reader. In case + * we hit any issue with this reader, we set this field to true and avoid + * using it for the next stripe. + */ + boolean shouldSkip = false; + + BlockReaderInfo(BlockReader reader, LocatedBlock targetBlock, + DatanodeInfo dn, long offset) { + this.reader = reader; + this.targetBlock = targetBlock; + this.datanode = dn; + this.blockReaderOffset = offset; + } + + void setOffset(long offset) { + this.blockReaderOffset = offset; + } + + void skip() { + this.shouldSkip = true; + } + } + + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + + private final BlockReaderInfo[] blockReaders; + private final int cellSize; + private final short dataBlkNum; + private final short parityBlkNum; + private final int groupSize; + /** the buffer for a complete stripe */ + private ByteBuffer curStripeBuf; + private ByteBuffer parityBuf; + private final ECSchema schema; + private final RawErasureDecoder decoder; + + /** + * indicate the start/end offset of the current buffered stripe in the + * block group + */ + private StripeRange curStripeRange; + private final CompletionService<Void> readingService; + + DFSStripedInputStream(DFSClient dfsClient, String src, + boolean verifyChecksum, ECSchema schema, int cellSize, + LocatedBlocks locatedBlocks) throws IOException { + super(dfsClient, src, verifyChecksum, locatedBlocks); + + assert schema != null; + this.schema = schema; + this.cellSize = cellSize; + dataBlkNum = (short) schema.getNumDataUnits(); + parityBlkNum = (short) schema.getNumParityUnits(); + groupSize = dataBlkNum + parityBlkNum; + blockReaders = new BlockReaderInfo[groupSize]; + curStripeRange = new StripeRange(0, 0); + readingService = + new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); + decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(), + dataBlkNum, parityBlkNum); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Creating an striped input stream for file " + src); + } + } + + private void resetCurStripeBuffer() { + if (curStripeBuf == null) { + curStripeBuf = bufferPool.getBuffer(cellSize * dataBlkNum); + } + curStripeBuf.clear(); + curStripeRange = new StripeRange(0, 0); + } + + private ByteBuffer getParityBuffer() { + if (parityBuf == null) { + parityBuf = bufferPool.getBuffer(cellSize * parityBlkNum); + } + parityBuf.clear(); + return parityBuf; + } + + /** + * When seeking into a new block group, create blockReader for each internal + * block in the group. + */ + private synchronized void blockSeekTo(long target) throws IOException { + if (target >= getFileLength()) { + throw new IOException("Attempted to read past end of file"); + } + + // Will be getting a new BlockReader. + closeCurrentBlockReaders(); + + // Compute desired striped block group + LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target); + // Update current position + this.pos = target; + this.blockEnd = targetBlockGroup.getStartOffset() + + targetBlockGroup.getBlockSize() - 1; + currentLocatedBlock = targetBlockGroup; + } + + @Override + public synchronized void close() throws IOException { + super.close(); + if (curStripeBuf != null) { + bufferPool.returnBuffer(curStripeBuf); + curStripeBuf = null; + } + if (parityBuf != null) { + bufferPool.returnBuffer(parityBuf); + parityBuf = null; + } + } + + /** + * Extend the super method with the logic of switching between cells. + * When reaching the end of a cell, proceed to the next cell and read it + * with the next blockReader. + */ + @Override + protected void closeCurrentBlockReaders() { + resetCurStripeBuffer(); + if (blockReaders == null || blockReaders.length == 0) { + return; + } + for (int i = 0; i < groupSize; i++) { + closeReader(blockReaders[i]); + blockReaders[i] = null; + } + blockEnd = -1; + } + + private void closeReader(BlockReaderInfo readerInfo) { + if (readerInfo != null) { + IOUtils.cleanup(DFSClient.LOG, readerInfo.reader); + readerInfo.skip(); + } + } + + private long getOffsetInBlockGroup() { + return getOffsetInBlockGroup(pos); + } + + private long getOffsetInBlockGroup(long pos) { + return pos - currentLocatedBlock.getStartOffset(); + } + + /** + * Read a new stripe covering the current position, and store the data in the + * {@link #curStripeBuf}. + */ + private void readOneStripe( + Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) + throws IOException { + resetCurStripeBuffer(); + + // compute stripe range based on pos + final long offsetInBlockGroup = getOffsetInBlockGroup(); + final long stripeLen = cellSize * dataBlkNum; + final int stripeIndex = (int) (offsetInBlockGroup / stripeLen); + final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen); + final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize() + - (stripeIndex * stripeLen), stripeLen); + StripeRange stripeRange = new StripeRange(offsetInBlockGroup, + stripeLimit - stripeBufOffset); + + LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock; + AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize, + blockGroup, offsetInBlockGroup, + offsetInBlockGroup + stripeRange.length - 1, curStripeBuf); + final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( + blockGroup, cellSize, dataBlkNum, parityBlkNum); + // read the whole stripe + for (AlignedStripe stripe : stripes) { + // Parse group to get chosen DN location + StripeReader sreader = new StatefulStripeReader(readingService, stripe, + blks, blockReaders, corruptedBlockMap); + sreader.readStripe(); + } + curStripeBuf.position(stripeBufOffset); + curStripeBuf.limit(stripeLimit); + curStripeRange = stripeRange; + } + + private Callable<Void> readCells(final BlockReader reader, + final DatanodeInfo datanode, final long currentReaderOffset, + final long targetReaderOffset, final ByteBufferStrategy[] strategies, + final ExtendedBlock currentBlock, + final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + // reader can be null if getBlockReaderWithRetry failed or + // the reader hit exception before + if (reader == null) { + throw new IOException("The BlockReader is null. " + + "The BlockReader creation failed or the reader hit exception."); + } + Preconditions.checkState(currentReaderOffset <= targetReaderOffset); + if (currentReaderOffset < targetReaderOffset) { + long skipped = reader.skip(targetReaderOffset - currentReaderOffset); + Preconditions.checkState( + skipped == targetReaderOffset - currentReaderOffset); + } + int result = 0; + for (ByteBufferStrategy strategy : strategies) { + result += readToBuffer(reader, datanode, strategy, currentBlock, + corruptedBlockMap); + } + return null; + } + }; + } + + private int readToBuffer(BlockReader blockReader, + DatanodeInfo currentNode, ByteBufferStrategy strategy, + ExtendedBlock currentBlock, + Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) + throws IOException { + final int targetLength = strategy.buf.remaining(); + int length = 0; + try { + while (length < targetLength) { + int ret = strategy.doRead(blockReader, 0, 0); + if (ret < 0) { + throw new IOException("Unexpected EOS from the reader"); + } + length += ret; + } + return length; + } catch (ChecksumException ce) { + DFSClient.LOG.warn("Found Checksum error for " + + currentBlock + " from " + currentNode + + " at " + ce.getPos()); + // we want to remember which block replicas we have tried + addIntoCorruptedBlockMap(currentBlock, currentNode, + corruptedBlockMap); + throw ce; + } catch (IOException e) { + DFSClient.LOG.warn("Exception while reading from " + + currentBlock + " of " + src + " from " + + currentNode, e); + throw e; + } + } + + /** + * Seek to a new arbitrary location + */ + @Override + public synchronized void seek(long targetPos) throws IOException { + if (targetPos > getFileLength()) { + throw new EOFException("Cannot seek after EOF"); + } + if (targetPos < 0) { + throw new EOFException("Cannot seek to negative offset"); + } + if (closed.get()) { + throw new IOException("Stream is closed!"); + } + if (targetPos <= blockEnd) { + final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos); + if (curStripeRange.include(targetOffsetInBlk)) { + int bufOffset = getStripedBufOffset(targetOffsetInBlk); + curStripeBuf.position(bufOffset); + pos = targetPos; + return; + } + } + pos = targetPos; + blockEnd = -1; + } + + private int getStripedBufOffset(long offsetInBlockGroup) { + final long stripeLen = cellSize * dataBlkNum; + // compute the position in the curStripeBuf based on "pos" + return (int) (offsetInBlockGroup % stripeLen); + } + + @Override + public synchronized boolean seekToNewSource(long targetPos) + throws IOException { + return false; + } + + @Override + protected synchronized int readWithStrategy(ReaderStrategy strategy, + int off, int len) throws IOException { + dfsClient.checkOpen(); + if (closed.get()) { + throw new IOException("Stream closed"); + } + Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap = + new ConcurrentHashMap<>(); + if (pos < getFileLength()) { + try { + if (pos > blockEnd) { + blockSeekTo(pos); + } + int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); + synchronized (infoLock) { + if (locatedBlocks.isLastBlockComplete()) { + realLen = (int) Math.min(realLen, + locatedBlocks.getFileLength() - pos); + } + } + + /** Number of bytes already read into buffer */ + int result = 0; + while (result < realLen) { + if (!curStripeRange.include(getOffsetInBlockGroup())) { + readOneStripe(corruptedBlockMap); + } + int ret = copyToTargetBuf(strategy, off + result, realLen - result); + result += ret; + pos += ret; + } + if (dfsClient.stats != null) { + dfsClient.stats.incrementBytesRead(result); + } + return result; + } finally { + // Check if need to report block replicas corruption either read + // was successful or ChecksumException occured. + reportCheckSumFailure(corruptedBlockMap, + currentLocatedBlock.getLocations().length); + } + } + return -1; + } + + /** + * Copy the data from {@link #curStripeBuf} into the given buffer + * @param strategy the ReaderStrategy containing the given buffer + * @param offset the offset of the given buffer. Used only when strategy is + * a ByteArrayStrategy + * @param length target length + * @return number of bytes copied + */ + private int copyToTargetBuf(ReaderStrategy strategy, int offset, int length) { + final long offsetInBlk = getOffsetInBlockGroup(); + int bufOffset = getStripedBufOffset(offsetInBlk); + curStripeBuf.position(bufOffset); + return strategy.copyFrom(curStripeBuf, offset, + Math.min(length, curStripeBuf.remaining())); + } + + /** + * The super method {@link DFSInputStream#refreshLocatedBlock} refreshes + * cached LocatedBlock by executing {@link DFSInputStream#getBlockAt} again. + * This method extends the logic by first remembering the index of the + * internal block, and re-parsing the refreshed block group with the same + * index. + */ + @Override + protected LocatedBlock refreshLocatedBlock(LocatedBlock block) + throws IOException { + int idx = BlockIdManager.getBlockIndex(block.getBlock().getLocalBlock()); + LocatedBlock lb = getBlockGroupAt(block.getStartOffset()); + // If indexing information is returned, iterate through the index array + // to find the entry for position idx in the group + LocatedStripedBlock lsb = (LocatedStripedBlock) lb; + int i = 0; + for (; i < lsb.getBlockIndices().length; i++) { + if (lsb.getBlockIndices()[i] == idx) { + break; + } + } + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("refreshLocatedBlock for striped blocks, offset=" + + block.getStartOffset() + ". Obtained block " + lb + ", idx=" + idx); + } + return StripedBlockUtil.constructInternalBlock( + lsb, i, cellSize, dataBlkNum, idx); + } + + private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException { + LocatedBlock lb = super.getBlockAt(offset); + assert lb instanceof LocatedStripedBlock : "NameNode" + + " should return a LocatedStripedBlock for a striped file"; + return (LocatedStripedBlock)lb; + } + + /** + * Real implementation of pread. + */ + @Override + protected void fetchBlockByteRange(LocatedBlock block, long start, + long end, byte[] buf, int offset, + Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) + throws IOException { + // Refresh the striped block group + LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset()); + + AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes( + schema, cellSize, blockGroup, start, end, buf, offset); + CompletionService<Void> readService = new ExecutorCompletionService<>( + dfsClient.getStripedReadsThreadPool()); + final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( + blockGroup, cellSize, dataBlkNum, parityBlkNum); + final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize]; + try { + for (AlignedStripe stripe : stripes) { + // Parse group to get chosen DN location + StripeReader preader = new PositionStripeReader(readService, stripe, + blks, preaderInfos, corruptedBlockMap); + preader.readStripe(); + } + } finally { + for (BlockReaderInfo preaderInfo : preaderInfos) { + closeReader(preaderInfo); + } + } + } + + /** + * The reader for reading a complete {@link AlignedStripe}. Note that an + * {@link AlignedStripe} may cross multiple stripes with cellSize width. + */ + private abstract class StripeReader { + final Map<Future<Void>, Integer> futures = new HashMap<>(); + final AlignedStripe alignedStripe; + final CompletionService<Void> service; + final LocatedBlock[] targetBlocks; + final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap; + final BlockReaderInfo[] readerInfos; + + StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe, + LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos, + Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { + this.service = service; + this.alignedStripe = alignedStripe; + this.targetBlocks = targetBlocks; + this.readerInfos = readerInfos; + this.corruptedBlockMap = corruptedBlockMap; + } + + /** prepare all the data chunks */ + abstract void prepareDecodeInputs(); + + /** prepare the parity chunk and block reader if necessary */ + abstract boolean prepareParityChunk(int index) throws IOException; + + abstract void decode(); + + void updateState4SuccessRead(StripingChunkReadResult result) { + Preconditions.checkArgument( + result.state == StripingChunkReadResult.SUCCESSFUL); + readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock() + + alignedStripe.getSpanInBlock()); + } + + private void checkMissingBlocks() throws IOException { + if (alignedStripe.missingChunksNum > parityBlkNum) { + clearFutures(futures.keySet()); + throw new IOException(alignedStripe.missingChunksNum + + " missing blocks, the stripe is: " + alignedStripe); + } + } + + /** + * We need decoding. Thus go through all the data chunks and make sure we + * submit read requests for all of them. + */ + private void readDataForDecoding() throws IOException { + prepareDecodeInputs(); + for (int i = 0; i < dataBlkNum; i++) { + Preconditions.checkNotNull(alignedStripe.chunks[i]); + if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) { + if (!readChunk(targetBlocks[i], i)) { + alignedStripe.missingChunksNum++; + } + } + } + checkMissingBlocks(); + } + + void readParityChunks(int num) throws IOException { + for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num; + i++) { + if (alignedStripe.chunks[i] == null) { + if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) { + j++; + } else { + alignedStripe.missingChunksNum++; + } + } + } + checkMissingBlocks(); + } + + boolean createBlockReader(LocatedBlock block, int chunkIndex) + throws IOException { + BlockReader reader = null; + final ReaderRetryPolicy retry = new ReaderRetryPolicy(); + DNAddrPair dnInfo = new DNAddrPair(null, null, null); + + while(true) { + try { + // the cached block location might have been re-fetched, so always + // get it from cache. + block = refreshLocatedBlock(block); + targetBlocks[chunkIndex] = block; + + // internal block has one location, just rule out the deadNodes + dnInfo = getBestNodeDNAddrPair(block, null); + if (dnInfo == null) { + break; + } + reader = getBlockReader(block, alignedStripe.getOffsetInBlock(), + block.getBlockSize() - alignedStripe.getOffsetInBlock(), + dnInfo.addr, dnInfo.storageType, dnInfo.info); + } catch (IOException e) { + if (e instanceof InvalidEncryptionKeyException && + retry.shouldRefetchEncryptionKey()) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + dnInfo.addr + + " : " + e); + dfsClient.clearDataEncryptionKey(); + retry.refetchEncryptionKey(); + } else if (retry.shouldRefetchToken() && + tokenRefetchNeeded(e, dnInfo.addr)) { + fetchBlockAt(block.getStartOffset()); + retry.refetchToken(); + } else { + //TODO: handles connection issues + DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " + + "block" + block.getBlock(), e); + // re-fetch the block in case the block has been moved + fetchBlockAt(block.getStartOffset()); + addToDeadNodes(dnInfo.info); + } + } + if (reader != null) { + readerInfos[chunkIndex] = new BlockReaderInfo(reader, block, + dnInfo.info, alignedStripe.getOffsetInBlock()); + return true; + } + } + return false; + } + + private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) { + if (chunk.byteBuffer != null) { + ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer); + return new ByteBufferStrategy[]{strategy}; + } else { + ByteBufferStrategy[] strategies = + new ByteBufferStrategy[chunk.byteArray.getOffsets().length]; + for (int i = 0; i < strategies.length; i++) { + ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(), + chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]); + strategies[i] = new ByteBufferStrategy(buffer); + } + return strategies; + } + } + + boolean readChunk(final LocatedBlock block, int chunkIndex) + throws IOException { + final StripingChunk chunk = alignedStripe.chunks[chunkIndex]; + if (block == null) { + chunk.state = StripingChunk.MISSING; + return false; + } + if (readerInfos[chunkIndex] == null) { + if (!createBlockReader(block, chunkIndex)) { + chunk.state = StripingChunk.MISSING; + return false; + } + } else if (readerInfos[chunkIndex].shouldSkip) { + chunk.state = StripingChunk.MISSING; + return false; + } + + chunk.state = StripingChunk.PENDING; + Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader, + readerInfos[chunkIndex].datanode, + readerInfos[chunkIndex].blockReaderOffset, + alignedStripe.getOffsetInBlock(), getReadStrategies(chunk), + block.getBlock(), corruptedBlockMap); + + Future<Void> request = service.submit(readCallable); + futures.put(request, chunkIndex); + return true; + } + + /** read the whole stripe. do decoding if necessary */ + void readStripe() throws IOException { + for (int i = 0; i < dataBlkNum; i++) { + if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { + if (!readChunk(targetBlocks[i], i)) { + alignedStripe.missingChunksNum++; + } + } + } + // There are missing block locations at this stage. Thus we need to read + // the full stripe and one more parity block. + if (alignedStripe.missingChunksNum > 0) { + checkMissingBlocks(); + readDataForDecoding(); + // read parity chunks + readParityChunks(alignedStripe.missingChunksNum); + } + // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks + + // Input buffers for potential decode operation, which remains null until + // first read failure + while (!futures.isEmpty()) { + try { + StripingChunkReadResult r = StripedBlockUtil + .getNextCompletedStripedRead(service, futures, 0); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + + alignedStripe); + } + StripingChunk returnedChunk = alignedStripe.chunks[r.index]; + Preconditions.checkNotNull(returnedChunk); + Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING); + + if (r.state == StripingChunkReadResult.SUCCESSFUL) { + returnedChunk.state = StripingChunk.FETCHED; + alignedStripe.fetchedChunksNum++; + updateState4SuccessRead(r); + if (alignedStripe.fetchedChunksNum == dataBlkNum) { + clearFutures(futures.keySet()); + break; + } + } else { + returnedChunk.state = StripingChunk.MISSING; + // close the corresponding reader + closeReader(readerInfos[r.index]); + + final int missing = alignedStripe.missingChunksNum; + alignedStripe.missingChunksNum++; + checkMissingBlocks(); + + readDataForDecoding(); + readParityChunks(alignedStripe.missingChunksNum - missing); + } + } catch (InterruptedException ie) { + String err = "Read request interrupted"; + DFSClient.LOG.error(err); + clearFutures(futures.keySet()); + // Don't decode if read interrupted + throw new InterruptedIOException(err); + } + } + + if (alignedStripe.missingChunksNum > 0) { + decode(); + } + } + } + + class PositionStripeReader extends StripeReader { + private byte[][] decodeInputs = null; + + PositionStripeReader(CompletionService<Void> service, + AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, + BlockReaderInfo[] readerInfos, + Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { + super(service, alignedStripe, targetBlocks, readerInfos, + corruptedBlockMap); + } + + @Override + void prepareDecodeInputs() { + if (decodeInputs == null) { + decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe, + dataBlkNum, parityBlkNum); + } + } + + @Override + boolean prepareParityChunk(int index) { + Preconditions.checkState(index >= dataBlkNum && + alignedStripe.chunks[index] == null); + final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index, + dataBlkNum, parityBlkNum); + alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]); + alignedStripe.chunks[index].addByteArraySlice(0, + (int) alignedStripe.getSpanInBlock()); + return true; + } + + @Override + void decode() { + StripedBlockUtil.finalizeDecodeInputs(decodeInputs, dataBlkNum, + parityBlkNum, alignedStripe); + StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe, + dataBlkNum, parityBlkNum, decoder); + } + } + + class StatefulStripeReader extends StripeReader { + ByteBuffer[] decodeInputs; + + StatefulStripeReader(CompletionService<Void> service, + AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, + BlockReaderInfo[] readerInfos, + Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { + super(service, alignedStripe, targetBlocks, readerInfos, + corruptedBlockMap); + } + + @Override + void prepareDecodeInputs() { + if (decodeInputs == null) { + decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum]; + ByteBuffer cur = curStripeBuf.duplicate(); + StripedBlockUtil.VerticalRange range = alignedStripe.range; + for (int i = 0; i < dataBlkNum; i++) { + cur.limit(cur.capacity()); + int pos = (int) (range.offsetInBlock % cellSize + cellSize * i); + cur.position(pos); + cur.limit((int) (pos + range.spanInBlock)); + final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i, + dataBlkNum, parityBlkNum); + decodeInputs[decodeIndex] = cur.slice(); + if (alignedStripe.chunks[i] == null) { + alignedStripe.chunks[i] = new StripingChunk( + decodeInputs[decodeIndex]); + } + } + } + } + + @Override + boolean prepareParityChunk(int index) throws IOException { + Preconditions.checkState(index >= dataBlkNum + && alignedStripe.chunks[index] == null); + if (blockReaders[index] != null && blockReaders[index].shouldSkip) { + alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING); + // we have failed the block reader before + return false; + } + final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index, + dataBlkNum, parityBlkNum); + ByteBuffer buf = getParityBuffer().duplicate(); + buf.position(cellSize * decodeIndex); + buf.limit(cellSize * decodeIndex + (int) alignedStripe.range.spanInBlock); + decodeInputs[decodeIndex] = buf.slice(); + alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]); + return true; + } + + @Override + void decode() { + // TODO no copy for data chunks. this depends on HADOOP-12047 + final int span = (int) alignedStripe.getSpanInBlock(); + for (int i = 0; i < alignedStripe.chunks.length; i++) { + final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i, + dataBlkNum, parityBlkNum); + if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state == StripingChunk.ALLZERO) { + for (int j = 0; j < span; j++) { + decodeInputs[decodeIndex].put((byte) 0); + } + decodeInputs[decodeIndex].flip(); + } else if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state == StripingChunk.FETCHED) { + decodeInputs[decodeIndex].position(0); + decodeInputs[decodeIndex].limit(span); + } + } + int[] decodeIndices = new int[parityBlkNum]; + int pos = 0; + for (int i = 0; i < alignedStripe.chunks.length; i++) { + if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state == StripingChunk.MISSING) { + int decodeIndex = StripedBlockUtil.convertIndex4Decode(i, + dataBlkNum, parityBlkNum); + if (i < dataBlkNum) { + decodeIndices[pos++] = decodeIndex; + } else { + decodeInputs[decodeIndex] = null; + } + } + } + decodeIndices = Arrays.copyOf(decodeIndices, pos); + + final int decodeChunkNum = decodeIndices.length; + ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum]; + for (int i = 0; i < decodeChunkNum; i++) { + outputs[i] = decodeInputs[decodeIndices[i]]; + outputs[i].position(0); + outputs[i].limit((int) alignedStripe.range.spanInBlock); + decodeInputs[decodeIndices[i]] = null; + } + + decoder.decode(decodeInputs, decodeIndices, outputs); + } + } + + /** + * May need online read recovery, zero-copy read doesn't make + * sense, so don't support it. + */ + @Override + public synchronized ByteBuffer read(ByteBufferPool bufferPool, + int maxLength, EnumSet<ReadOption> opts) + throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException( + "Not support enhanced byte buffer access."); + } + + @Override + public synchronized void releaseBuffer(ByteBuffer buffer) { + throw new UnsupportedOperationException( + "Not support enhanced byte buffer access."); + } + + /** A variation to {@link DFSInputStream#cancelAll} */ + private void clearFutures(Collection<Future<Void>> futures) { + for (Future<Void> future : futures) { + future.cancel(false); + } + futures.clear(); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java new file mode 100644 index 0000000..746b791 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -0,0 +1,653 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Progressable; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +import com.google.common.base.Preconditions; + + +/** + * This class supports writing files in striped layout and erasure coded format. + * Each stripe contains a sequence of cells. + */ [email protected] +public class DFSStripedOutputStream extends DFSOutputStream { + static class MultipleBlockingQueue<T> { + private final List<BlockingQueue<T>> queues; + + MultipleBlockingQueue(int numQueue, int queueSize) { + queues = new ArrayList<>(numQueue); + for (int i = 0; i < numQueue; i++) { + queues.add(new LinkedBlockingQueue<T>(queueSize)); + } + } + + boolean isEmpty() { + for(int i = 0; i < queues.size(); i++) { + if (!queues.get(i).isEmpty()) { + return false; + } + } + return true; + } + + int numQueues() { + return queues.size(); + } + + void offer(int i, T object) { + final boolean b = queues.get(i).offer(object); + Preconditions.checkState(b, "Failed to offer " + object + + " to queue, i=" + i); + } + + T take(int i) throws InterruptedIOException { + try { + return queues.get(i).take(); + } catch(InterruptedException ie) { + throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, ie); + } + } + + T poll(int i) { + return queues.get(i).poll(); + } + + T peek(int i) { + return queues.get(i).peek(); + } + } + + /** Coordinate the communication between the streamers. */ + class Coordinator { + private final MultipleBlockingQueue<LocatedBlock> followingBlocks; + private final MultipleBlockingQueue<ExtendedBlock> endBlocks; + + private final MultipleBlockingQueue<LocatedBlock> newBlocks; + private final MultipleBlockingQueue<ExtendedBlock> updateBlocks; + + Coordinator(final DfsClientConf conf, final int numDataBlocks, + final int numAllBlocks) { + followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); + endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1); + + newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); + updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); + } + + MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() { + return followingBlocks; + } + + MultipleBlockingQueue<LocatedBlock> getNewBlocks() { + return newBlocks; + } + + MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() { + return updateBlocks; + } + + StripedDataStreamer getStripedDataStreamer(int i) { + return DFSStripedOutputStream.this.getStripedDataStreamer(i); + } + + void offerEndBlock(int i, ExtendedBlock block) { + endBlocks.offer(i, block); + } + + ExtendedBlock takeEndBlock(int i) throws InterruptedIOException { + return endBlocks.take(i); + } + + boolean hasAllEndBlocks() { + for(int i = 0; i < endBlocks.numQueues(); i++) { + if (endBlocks.peek(i) == null) { + return false; + } + } + return true; + } + + void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) { + ExtendedBlock b = endBlocks.peek(i); + if (b == null) { + // streamer just has failed, put end block and continue + b = block; + offerEndBlock(i, b); + } + b.setNumBytes(newBytes); + } + + /** @return a block representing the entire block group. */ + ExtendedBlock getBlockGroup() { + final StripedDataStreamer s0 = getStripedDataStreamer(0); + final ExtendedBlock b0 = s0.getBlock(); + if (b0 == null) { + return null; + } + + final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0; + final ExtendedBlock block = new ExtendedBlock(b0); + long numBytes = b0.getNumBytes(); + for (int i = 1; i < numDataBlocks; i++) { + final StripedDataStreamer si = getStripedDataStreamer(i); + final ExtendedBlock bi = si.getBlock(); + if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) { + block.setGenerationStamp(bi.getGenerationStamp()); + } + numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock(); + } + block.setNumBytes(numBytes); + if (LOG.isDebugEnabled()) { + LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes()); + } + return block; + } + } + + /** Buffers for writing the data and parity cells of a stripe. */ + class CellBuffers { + private final ByteBuffer[] buffers; + private final byte[][] checksumArrays; + + CellBuffers(int numParityBlocks) throws InterruptedException{ + if (cellSize % bytesPerChecksum != 0) { + throw new HadoopIllegalArgumentException("Invalid values: " + + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + + bytesPerChecksum + ") must divide cell size (=" + cellSize + ")."); + } + + checksumArrays = new byte[numParityBlocks][]; + final int size = getChecksumSize() * (cellSize / bytesPerChecksum); + for (int i = 0; i < checksumArrays.length; i++) { + checksumArrays[i] = new byte[size]; + } + + buffers = new ByteBuffer[numAllBlocks]; + for (int i = 0; i < buffers.length; i++) { + buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize)); + } + } + + private ByteBuffer[] getBuffers() { + return buffers; + } + + byte[] getChecksumArray(int i) { + return checksumArrays[i - numDataBlocks]; + } + + private int addTo(int i, byte[] b, int off, int len) { + final ByteBuffer buf = buffers[i]; + final int pos = buf.position() + len; + Preconditions.checkState(pos <= cellSize); + buf.put(b, off, len); + return pos; + } + + private void clear() { + for (int i = 0; i< numAllBlocks; i++) { + buffers[i].clear(); + if (i >= numDataBlocks) { + Arrays.fill(buffers[i].array(), (byte) 0); + } + } + } + + private void release() { + for (int i = 0; i < numAllBlocks; i++) { + byteArrayManager.release(buffers[i].array()); + } + } + + private void flipDataBuffers() { + for (int i = 0; i < numDataBlocks; i++) { + buffers[i].flip(); + } + } + } + + private final Coordinator coordinator; + private final CellBuffers cellBuffers; + private final RawErasureEncoder encoder; + private final List<StripedDataStreamer> streamers; + private final DFSPacket[] currentPackets; // current Packet of each streamer + + /** Size of each striping cell, must be a multiple of bytesPerChecksum */ + private final int cellSize; + private final int numAllBlocks; + private final int numDataBlocks; + + @Override + ExtendedBlock getBlock() { + return coordinator.getBlockGroup(); + } + + /** Construct a new output stream for creating a file. */ + DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, + EnumSet<CreateFlag> flag, Progressable progress, + DataChecksum checksum, String[] favoredNodes) + throws IOException { + super(dfsClient, src, stat, flag, progress, checksum, favoredNodes); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating DFSStripedOutputStream for " + src); + } + + final ECSchema schema = stat.getECSchema(); + final int numParityBlocks = schema.getNumParityUnits(); + cellSize = stat.getStripeCellSize(); + numDataBlocks = schema.getNumDataUnits(); + numAllBlocks = numDataBlocks + numParityBlocks; + + encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(), + numDataBlocks, numParityBlocks); + + coordinator = new Coordinator(dfsClient.getConf(), + numDataBlocks, numAllBlocks); + try { + cellBuffers = new CellBuffers(numParityBlocks); + } catch (InterruptedException ie) { + throw DFSUtil.toInterruptedIOException( + "Failed to create cell buffers", ie); + } + + List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks); + for (short i = 0; i < numAllBlocks; i++) { + StripedDataStreamer streamer = new StripedDataStreamer(stat, + dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, + favoredNodes, i, coordinator); + s.add(streamer); + } + streamers = Collections.unmodifiableList(s); + currentPackets = new DFSPacket[streamers.size()]; + setCurrentStreamer(0); + } + + StripedDataStreamer getStripedDataStreamer(int i) { + return streamers.get(i); + } + + int getCurrentIndex() { + return getCurrentStreamer().getIndex(); + } + + private synchronized StripedDataStreamer getCurrentStreamer() { + return (StripedDataStreamer)streamer; + } + + private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) + throws IOException { + // backup currentPacket for current streamer + int oldIdx = streamers.indexOf(streamer); + if (oldIdx >= 0) { + currentPackets[oldIdx] = currentPacket; + } + + streamer = streamers.get(newIdx); + currentPacket = currentPackets[newIdx]; + adjustChunkBoundary(); + + return getCurrentStreamer(); + } + + /** + * Encode the buffers, i.e. compute parities. + * + * @param buffers data buffers + parity buffers + */ + private static void encode(RawErasureEncoder encoder, int numData, + ByteBuffer[] buffers) { + final ByteBuffer[] dataBuffers = new ByteBuffer[numData]; + final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData]; + System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length); + System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length); + + encoder.encode(dataBuffers, parityBuffers); + } + + + private void checkStreamers() throws IOException { + int count = 0; + for(StripedDataStreamer s : streamers) { + if (!s.isFailed()) { + if (s.getBlock() != null) { + s.getErrorState().initExternalError(); + } + count++; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("checkStreamers: " + streamers); + LOG.debug("count=" + count); + } + if (count < numDataBlocks) { + throw new IOException("Failed: the number of remaining blocks = " + + count + " < the number of data blocks = " + numDataBlocks); + } + } + + private void handleStreamerFailure(String err, + Exception e) throws IOException { + LOG.warn("Failed: " + err + ", " + this, e); + getCurrentStreamer().setFailed(true); + checkStreamers(); + currentPacket = null; + } + + @Override + protected synchronized void writeChunk(byte[] bytes, int offset, int len, + byte[] checksum, int ckoff, int cklen) throws IOException { + final int index = getCurrentIndex(); + final StripedDataStreamer current = getCurrentStreamer(); + final int pos = cellBuffers.addTo(index, bytes, offset, len); + final boolean cellFull = pos == cellSize; + + final long oldBytes = current.getBytesCurBlock(); + if (!current.isFailed()) { + try { + super.writeChunk(bytes, offset, len, checksum, ckoff, cklen); + } catch(Exception e) { + handleStreamerFailure("offset=" + offset + ", length=" + len, e); + } + } + + if (current.isFailed()) { + final long newBytes = oldBytes + len; + coordinator.setBytesEndBlock(index, newBytes, current.getBlock()); + current.setBytesCurBlock(newBytes); + } + + // Two extra steps are needed when a striping cell is full: + // 1. Forward the current index pointer + // 2. Generate parity packets if a full stripe of data cells are present + if (cellFull) { + int next = index + 1; + //When all data cells in a stripe are ready, we need to encode + //them and generate some parity cells. These cells will be + //converted to packets and put to their DataStreamer's queue. + if (next == numDataBlocks) { + cellBuffers.flipDataBuffers(); + writeParityCells(); + next = 0; + } + setCurrentStreamer(next); + } + } + + private int stripeDataSize() { + return numDataBlocks * cellSize; + } + + @Override + public void hflush() { + throw new UnsupportedOperationException(); + } + + @Override + public void hsync() { + throw new UnsupportedOperationException(); + } + + @Override + protected synchronized void start() { + for (StripedDataStreamer streamer : streamers) { + streamer.start(); + } + } + + @Override + synchronized void abort() throws IOException { + if (isClosed()) { + return; + } + for (StripedDataStreamer streamer : streamers) { + streamer.getLastException().set(new IOException("Lease timeout of " + + (dfsClient.getConf().getHdfsTimeout()/1000) + + " seconds expired.")); + } + closeThreads(true); + dfsClient.endFileLease(fileId); + } + + @Override + boolean isClosed() { + if (closed) { + return true; + } + for(StripedDataStreamer s : streamers) { + if (!s.streamerClosed()) { + return false; + } + } + return true; + } + + @Override + protected void closeThreads(boolean force) throws IOException { + final MultipleIOException.Builder b = new MultipleIOException.Builder(); + try { + for (StripedDataStreamer streamer : streamers) { + try { + streamer.close(force); + streamer.join(); + streamer.closeSocket(); + } catch (Exception e) { + try { + handleStreamerFailure("force=" + force, e); + } catch (IOException ioe) { + b.add(ioe); + } + } finally { + streamer.setSocketToNull(); + } + } + } finally { + setClosed(); + } + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } + } + + /** + * Simply add bytesCurBlock together. Note that this result is not accurately + * the size of the block group. + */ + private long getCurrentSumBytes() { + long sum = 0; + for (int i = 0; i < numDataBlocks; i++) { + sum += streamers.get(i).getBytesCurBlock(); + } + return sum; + } + + private void writeParityCellsForLastStripe() throws IOException { + final long currentBlockGroupBytes = getCurrentSumBytes(); + if (currentBlockGroupBytes % stripeDataSize() == 0) { + return; + } + + final int firstCellSize = + (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize); + final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize? + firstCellSize : cellSize; + final ByteBuffer[] buffers = cellBuffers.getBuffers(); + + for (int i = 0; i < numAllBlocks; i++) { + // Pad zero bytes to make all cells exactly the size of parityCellSize + // If internal block is smaller than parity block, pad zero bytes. + // Also pad zero bytes to all parity cells + final int position = buffers[i].position(); + assert position <= parityCellSize : "If an internal block is smaller" + + " than parity block, then its last cell should be small than last" + + " parity cell"; + for (int j = 0; j < parityCellSize - position; j++) { + buffers[i].put((byte) 0); + } + buffers[i].flip(); + } + + writeParityCells(); + } + + void writeParityCells() throws IOException { + final ByteBuffer[] buffers = cellBuffers.getBuffers(); + //encode the data cells + encode(encoder, numDataBlocks, buffers); + for (int i = numDataBlocks; i < numAllBlocks; i++) { + writeParity(i, buffers[i], cellBuffers.getChecksumArray(i)); + } + cellBuffers.clear(); + } + + void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf + ) throws IOException { + final StripedDataStreamer current = setCurrentStreamer(index); + final int len = buffer.limit(); + + final long oldBytes = current.getBytesCurBlock(); + if (!current.isFailed()) { + try { + DataChecksum sum = getDataChecksum(); + sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0); + for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { + int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); + int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); + super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset, + getChecksumSize()); + } + } catch(Exception e) { + handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e); + } + } + + if (current.isFailed()) { + final long newBytes = oldBytes + len; + current.setBytesCurBlock(newBytes); + } + } + + @Override + void setClosed() { + super.setClosed(); + for (int i = 0; i < numAllBlocks; i++) { + streamers.get(i).release(); + } + cellBuffers.release(); + } + + @Override + protected synchronized void closeImpl() throws IOException { + if (isClosed()) { + final MultipleIOException.Builder b = new MultipleIOException.Builder(); + for(int i = 0; i < streamers.size(); i++) { + final StripedDataStreamer si = getStripedDataStreamer(i); + try { + si.getLastException().check(true); + } catch (IOException e) { + b.add(e); + } + } + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } + return; + } + + try { + // flush from all upper layers + try { + flushBuffer(); + // if the last stripe is incomplete, generate and write parity cells + writeParityCellsForLastStripe(); + enqueueAllCurrentPackets(); + } catch(Exception e) { + handleStreamerFailure("closeImpl", e); + } + + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer s = setCurrentStreamer(i); + if (!s.isFailed()) { + try { + if (s.getBytesCurBlock() > 0) { + setCurrentPacketToEmpty(); + } + // flush all data to Datanode + flushInternal(); + } catch(Exception e) { + handleStreamerFailure("closeImpl", e); + } + } + } + + closeThreads(false); + final ExtendedBlock lastBlock = coordinator.getBlockGroup(); + TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); + try { + completeFile(lastBlock); + } finally { + scope.close(); + } + dfsClient.endFileLease(fileId); + } catch (ClosedChannelException ignored) { + } finally { + setClosed(); + } + } + + private void enqueueAllCurrentPackets() throws IOException { + int idx = streamers.indexOf(getCurrentStreamer()); + for(int i = 0; i < streamers.size(); i++) { + setCurrentStreamer(i); + if (currentPacket != null) { + enqueueCurrentPacket(); + } + } + setCurrentStreamer(idx); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index cae56c0..c06a435 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -36,6 +36,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PAS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.PrintStream; import java.io.UnsupportedEncodingException; import java.net.InetAddress; @@ -55,7 +56,6 @@ import java.util.concurrent.ThreadLocalRandom; import javax.net.SocketFactory; -import com.google.common.collect.Sets; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Option; @@ -96,6 +96,7 @@ import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.protobuf.BlockingService; @InterfaceAudience.Private @@ -1527,4 +1528,10 @@ public class DFSUtil { DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty(); } + public static InterruptedIOException toInterruptedIOException(String message, + InterruptedException e) { + final InterruptedIOException iioe = new InterruptedIOException(message); + iioe.initCause(e); + return iioe; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 8dd85b7..c78199e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -209,6 +209,7 @@ class DataStreamer extends Daemon { static class ErrorState { private boolean error = false; + private boolean externalError = false; private int badNodeIndex = -1; private int restartingNodeIndex = -1; private long restartingNodeDeadline = 0; @@ -220,6 +221,7 @@ class DataStreamer extends Daemon { synchronized void reset() { error = false; + externalError = false; badNodeIndex = -1; restartingNodeIndex = -1; restartingNodeDeadline = 0; @@ -229,14 +231,24 @@ class DataStreamer extends Daemon { return error; } + synchronized boolean hasExternalErrorOnly() { + return error && externalError && !isNodeMarked(); + } + synchronized boolean hasDatanodeError() { - return error && isNodeMarked(); + return error && (isNodeMarked() || externalError); } synchronized void setError(boolean err) { this.error = err; } + synchronized void initExternalError() { + setError(true); + this.externalError = true; + } + + synchronized void setBadNodeIndex(int index) { this.badNodeIndex = index; } @@ -335,7 +347,7 @@ class DataStreamer extends Daemon { } private volatile boolean streamerClosed = false; - private ExtendedBlock block; // its length is number of bytes acked + protected ExtendedBlock block; // its length is number of bytes acked private Token<BlockTokenIdentifier> accessToken; private DataOutputStream blockStream; private DataInputStream blockReplyStream; @@ -366,12 +378,12 @@ class DataStreamer extends Daemon { private final LastExceptionInStreamer lastException = new LastExceptionInStreamer(); private Socket s; - private final DFSClient dfsClient; - private final String src; + protected final DFSClient dfsClient; + protected final String src; /** Only for DataTransferProtocol.writeBlock(..) */ private final DataChecksum checksum4WriteBlock; private final Progressable progress; - private final HdfsFileStatus stat; + protected final HdfsFileStatus stat; // appending to existing partial block private volatile boolean appendChunk = false; // both dataQueue and ackQueue are protected by dataQueue lock @@ -397,11 +409,13 @@ class DataStreamer extends Daemon { private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes; private final String[] favoredNodes; - private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, + private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, + DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, boolean isAppend, String[] favoredNodes) { + this.block = block; this.dfsClient = dfsClient; this.src = src; this.progress = progress; @@ -426,9 +440,8 @@ class DataStreamer extends Daemon { String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, String[] favoredNodes) { - this(stat, dfsClient, src, progress, checksum, cachingStrategy, + this(stat, block, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, false, favoredNodes); - this.block = block; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -442,10 +455,9 @@ class DataStreamer extends Daemon { String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage) throws IOException { - this(stat, dfsClient, src, progress, checksum, cachingStrategy, + this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, true, null); stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; - block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); } @@ -488,7 +500,7 @@ class DataStreamer extends Daemon { stage = BlockConstructionStage.DATA_STREAMING; } - private void endBlock() { + protected void endBlock() { if(LOG.isDebugEnabled()) { LOG.debug("Closing old block " + block); } @@ -574,7 +586,7 @@ class DataStreamer extends Daemon { // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if(LOG.isDebugEnabled()) { - LOG.debug("Allocating new block"); + LOG.debug("Allocating new block " + this); } setPipeline(nextBlockOutputStream()); initDataStreaming(); @@ -592,10 +604,7 @@ class DataStreamer extends Daemon { long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); if (lastByteOffsetInBlock > stat.getBlockSize()) { throw new IOException("BlockSize " + stat.getBlockSize() + - " is smaller than data size. " + - " Offset of packet in block " + - lastByteOffsetInBlock + - " Aborting file " + src); + " < lastByteOffsetInBlock, " + this + ", " + one); } if (one.isLastPacketInBlock()) { @@ -1069,6 +1078,10 @@ class DataStreamer extends Daemon { if (!errorState.hasDatanodeError()) { return false; } + if (errorState.hasExternalErrorOnly() && block == null) { + // block is not yet initialized, handle external error later. + return false; + } if (response != null) { LOG.info("Error Recovery for " + block + " waiting for responder to exit. "); @@ -1397,15 +1410,28 @@ class DataStreamer extends Daemon { } LocatedBlock updateBlockForPipeline() throws IOException { + return callUpdateBlockForPipeline(block); + } + + LocatedBlock callUpdateBlockForPipeline(ExtendedBlock newBlock) throws IOException { return dfsClient.namenode.updateBlockForPipeline( - block, dfsClient.clientName); + newBlock, dfsClient.clientName); + } + + static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) { + return new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), + b.getNumBytes(), newGS); } /** update pipeline at the namenode */ ExtendedBlock updatePipeline(long newGS) throws IOException { - final ExtendedBlock newBlock = new ExtendedBlock( - block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); - dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, + final ExtendedBlock newBlock = newBlock(block, newGS); + return callUpdatePipeline(block, newBlock); + } + + ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock) + throws IOException { + dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock, nodes, storageIDs); return newBlock; } @@ -1738,6 +1764,10 @@ class DataStreamer extends Daemon { return accessToken; } + ErrorState getErrorState() { + return errorState; + } + /** * Put a packet to the data queue * @@ -1750,7 +1780,7 @@ class DataStreamer extends Daemon { dataQueue.addLast(packet); lastQueuedSeqno = packet.getSeqno(); if (LOG.isDebugEnabled()) { - LOG.debug("Queued packet " + packet.getSeqno()); + LOG.debug("Queued " + packet + ", " + this); } dataQueue.notifyAll(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 0197cfb..4c9f9cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; @@ -89,6 +90,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; @@ -2298,4 +2300,68 @@ public class DistributedFileSystem extends FileSystem { throws IOException { return dfs.getInotifyEventStream(lastReadTxid); } + + /** + * Create the erasurecoding zone + * + * @param path Directory to create the ec zone + * @param schema ECSchema for the zone. If not specified default will be used. + * @param cellSize Cellsize for the striped erasure coding + * @throws IOException + */ + public void createErasureCodingZone(final Path path, final ECSchema schema, + final int cellSize) throws IOException { + Path absF = fixRelativePart(path); + new FileSystemLinkResolver<Void>() { + @Override + public Void doCall(final Path p) throws IOException, + UnresolvedLinkException { + dfs.createErasureCodingZone(getPathName(p), schema, cellSize); + return null; + } + + @Override + public Void next(final FileSystem fs, final Path p) throws IOException { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem myDfs = (DistributedFileSystem) fs; + myDfs.createErasureCodingZone(p, schema, cellSize); + return null; + } + throw new UnsupportedOperationException( + "Cannot createErasureCodingZone through a symlink to a " + + "non-DistributedFileSystem: " + path + " -> " + p); + } + }.resolve(this, absF); + } + + /** + * Get ErasureCoding zone information for the specified path + * + * @param path + * @return Returns the zone information if path is in EC zone, null otherwise + * @throws IOException + */ + public ErasureCodingZone getErasureCodingZone(final Path path) + throws IOException { + Path absF = fixRelativePart(path); + return new FileSystemLinkResolver<ErasureCodingZone>() { + @Override + public ErasureCodingZone doCall(final Path p) throws IOException, + UnresolvedLinkException { + return dfs.getErasureCodingZone(getPathName(p)); + } + + @Override + public ErasureCodingZone next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem myDfs = (DistributedFileSystem) fs; + return myDfs.getErasureCodingZone(p); + } + throw new UnsupportedOperationException( + "Cannot getErasureCodingZone through a symlink to a " + + "non-DistributedFileSystem: " + path + " -> " + p); + } + }.resolve(this, absF); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index d70f419..70cce7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -505,4 +505,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { return null; } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index c368d65..cce44b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -474,4 +474,9 @@ public class RemoteBlockReader2 implements BlockReader { public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { return null; } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java new file mode 100644 index 0000000..2d51dc4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator; +import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.util.ByteArrayManager; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Progressable; + +/** + * This class extends {@link DataStreamer} to support writing striped blocks + * to datanodes. + * A {@link DFSStripedOutputStream} has multiple {@link StripedDataStreamer}s. + * Whenever the streamers need to talk the namenode, only the fastest streamer + * sends an rpc call to the namenode and then populates the result for the + * other streamers. + */ +public class StripedDataStreamer extends DataStreamer { + /** + * This class is designed for multiple threads to share a + * {@link MultipleBlockingQueue}. Initially, the queue is empty. The earliest + * thread calling poll populates entries to the queue and the other threads + * will wait for it. Once the entries are populated, all the threads can poll + * their entries. + * + * @param <T> the queue entry type. + */ + static abstract class ConcurrentPoll<T> { + private final MultipleBlockingQueue<T> queue; + + ConcurrentPoll(MultipleBlockingQueue<T> queue) { + this.queue = queue; + } + + T poll(final int i) throws IOException { + for(;;) { + synchronized(queue) { + final T polled = queue.poll(i); + if (polled != null) { // already populated; return polled item. + return polled; + } + if (isReady2Populate()) { + populate(); + return queue.poll(i); + } + } + + // sleep and then retry. + try { + Thread.sleep(100); + } catch(InterruptedException ie) { + throw DFSUtil.toInterruptedIOException( + "Sleep interrupted during poll", ie); + } + } + } + + boolean isReady2Populate() { + return queue.isEmpty(); + } + + abstract void populate() throws IOException; + } + + private final Coordinator coordinator; + private final int index; + private volatile boolean failed; + + StripedDataStreamer(HdfsFileStatus stat, + DFSClient dfsClient, String src, + Progressable progress, DataChecksum checksum, + AtomicReference<CachingStrategy> cachingStrategy, + ByteArrayManager byteArrayManage, String[] favoredNodes, + short index, Coordinator coordinator) { + super(stat, null, dfsClient, src, progress, checksum, cachingStrategy, + byteArrayManage, favoredNodes); + this.index = index; + this.coordinator = coordinator; + } + + int getIndex() { + return index; + } + + void setFailed(boolean failed) { + this.failed = failed; + } + + boolean isFailed() { + return failed; + } + + private boolean isParityStreamer() { + return index >= NUM_DATA_BLOCKS; + } + + @Override + protected void endBlock() { + if (!isParityStreamer()) { + coordinator.offerEndBlock(index, block); + } + super.endBlock(); + } + + @Override + protected LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes) + throws IOException { + final MultipleBlockingQueue<LocatedBlock> followingBlocks + = coordinator.getFollowingBlocks(); + return new ConcurrentPoll<LocatedBlock>(followingBlocks) { + @Override + boolean isReady2Populate() { + return super.isReady2Populate() + && (block == null || coordinator.hasAllEndBlocks()); + } + + @Override + void populate() throws IOException { + getLastException().check(false); + + if (block != null) { + // set numByte for the previous block group + long bytes = 0; + for (int i = 0; i < NUM_DATA_BLOCKS; i++) { + final ExtendedBlock b = coordinator.takeEndBlock(i); + StripedBlockUtil.checkBlocks(index, block, i, b); + bytes += b.getNumBytes(); + } + block.setNumBytes(bytes); + block.setBlockId(block.getBlockId() - index); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("locateFollowingBlock: index=" + index + ", block=" + block); + } + + final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock( + excludedNodes); + final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( + (LocatedStripedBlock)lb, + BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); + + for (int i = 0; i < blocks.length; i++) { + if (!coordinator.getStripedDataStreamer(i).isFailed()) { + if (blocks[i] == null) { + getLastException().set( + new IOException("Failed to get following block, i=" + i)); + } else { + followingBlocks.offer(i, blocks[i]); + } + } + } + } + }.poll(index); + } + + @Override + LocatedBlock updateBlockForPipeline() throws IOException { + final MultipleBlockingQueue<LocatedBlock> newBlocks + = coordinator.getNewBlocks(); + return new ConcurrentPoll<LocatedBlock>(newBlocks) { + @Override + void populate() throws IOException { + final ExtendedBlock bg = coordinator.getBlockGroup(); + final LocatedBlock updated = callUpdateBlockForPipeline(bg); + final long newGS = updated.getBlock().getGenerationStamp(); + final LocatedBlock[] updatedBlks = StripedBlockUtil + .parseStripedBlockGroup((LocatedStripedBlock) updated, + BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); + for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { + final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock(); + if (bi != null) { + final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS), + null, null, null, -1, updated.isCorrupt(), null); + lb.setBlockToken(updatedBlks[i].getBlockToken()); + newBlocks.offer(i, lb); + } else { + final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i); + lb.getBlock().setGenerationStamp(newGS); + } + } + } + }.poll(index); + } + + @Override + ExtendedBlock updatePipeline(final long newGS) throws IOException { + final MultipleBlockingQueue<ExtendedBlock> updateBlocks + = coordinator.getUpdateBlocks(); + return new ConcurrentPoll<ExtendedBlock>(updateBlocks) { + @Override + void populate() throws IOException { + final ExtendedBlock bg = coordinator.getBlockGroup(); + final ExtendedBlock newBG = newBlock(bg, newGS); + final ExtendedBlock updated = callUpdatePipeline(bg, newBG); + for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { + final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock(); + updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp())); + } + } + }.poll(index); + } + + @Override + public String toString() { + return "#" + index + ": failed? " + Boolean.toString(failed).charAt(0) + + ", " + super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java index 84499bb..5a3c885 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java @@ -37,9 +37,11 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.hdfs.tools.DFSAdmin; +import org.apache.hadoop.io.erasurecode.ECSchema; /** * The public API for performing administrative functions on HDFS. Those writing @@ -363,4 +365,42 @@ public class HdfsAdmin { throws IOException { dfs.setStoragePolicy(src, policyName); } + + /** + * Create the ErasureCoding zone + * + * @param path + * Directory to create the ErasureCoding zone + * @param schema + * ECSchema for the zone. If not specified default will be used. + * @param cellSize + * Cellsize for the striped ErasureCoding + * @throws IOException + */ + public void createErasureCodingZone(final Path path, final ECSchema schema, + final int cellSize) throws IOException { + dfs.createErasureCodingZone(path, schema, cellSize); + } + + /** + * Get the ErasureCoding zone information for the specified path + * + * @param path + * @return Returns the zone information if path is in EC zone, null otherwise + * @throws IOException + */ + public ErasureCodingZone getErasureCodingZone(final Path path) + throws IOException { + return dfs.getErasureCodingZone(path); + } + + /** + * Get the ErasureCoding schemas supported. + * + * @return ECSchemas + * @throws IOException + */ + public ECSchema[] getECSchemas() throws IOException { + return dfs.getClient().getECSchemas(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index a257e32..9aef436 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -38,6 +38,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIM import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -101,6 +102,9 @@ public class DfsClientConf { private final long hedgedReadThresholdMillis; private final int hedgedReadThreadpoolSize; + private final int stripedReadThreadpoolSize; + + public DfsClientConf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout hdfsTimeout = Client.getTimeout(conf); @@ -191,7 +195,7 @@ public class DfsClientConf { connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); hdfsBlocksMetadataEnabled = conf.getBoolean( - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); fileBlockStorageLocationsNumThreads = conf.getInt( DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS, @@ -215,6 +219,13 @@ public class DfsClientConf { hedgedReadThreadpoolSize = conf.getInt( HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT); + + stripedReadThreadpoolSize = conf.getInt( + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY, + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT); + Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " + + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY + + " must be greater than 0."); } private DataChecksum.Type getChecksumType(Configuration conf) { @@ -492,6 +503,13 @@ public class DfsClientConf { } /** + * @return the stripedReadThreadpoolSize + */ + public int getStripedReadThreadpoolSize() { + return stripedReadThreadpoolSize; + } + + /** * @return the shortCircuitConf */ public ShortCircuitConf getShortCircuitConf() { @@ -744,4 +762,4 @@ public class DfsClientConf { return builder.toString(); } } -} \ No newline at end of file +}
