http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 0000000,0000000..69105a0 new file mode 100644 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@@ -1,0 -1,0 +1,972 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hdfs; ++ ++import com.google.common.base.Preconditions; ++import org.apache.hadoop.fs.ChecksumException; ++import org.apache.hadoop.fs.ReadOption; ++import org.apache.hadoop.hdfs.protocol.DatanodeInfo; ++import org.apache.hadoop.hdfs.protocol.ExtendedBlock; ++import org.apache.hadoop.hdfs.protocol.LocatedBlock; ++import org.apache.hadoop.hdfs.protocol.LocatedBlocks; ++import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; ++import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; ++import org.apache.hadoop.hdfs.util.StripedBlockUtil; ++import org.apache.hadoop.io.ByteBufferPool; ++ ++import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; ++import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; ++import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; ++ ++import org.apache.hadoop.io.IOUtils; ++import org.apache.hadoop.io.erasurecode.CodecUtil; ++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; ++ ++import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; ++import org.apache.hadoop.util.DirectBufferPool; ++ ++import java.io.EOFException; ++import java.io.IOException; ++import java.io.InterruptedIOException; ++import java.nio.ByteBuffer; ++import java.util.ArrayList; ++import java.util.Arrays; ++import java.util.Collections; ++import java.util.EnumSet; ++import java.util.List; ++import java.util.Set; ++import java.util.Collection; ++import java.util.Map; ++import java.util.HashMap; ++import java.util.concurrent.CompletionService; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.ExecutorCompletionService; ++import java.util.concurrent.Callable; ++import java.util.concurrent.Future; ++ ++/** ++ * DFSStripedInputStream reads from striped block groups ++ */ ++public class DFSStripedInputStream extends DFSInputStream { ++ ++ private static class ReaderRetryPolicy { ++ private int fetchEncryptionKeyTimes = 1; ++ private int fetchTokenTimes = 1; ++ ++ void refetchEncryptionKey() { ++ fetchEncryptionKeyTimes--; ++ } ++ ++ void refetchToken() { ++ fetchTokenTimes--; ++ } ++ ++ boolean shouldRefetchEncryptionKey() { ++ return fetchEncryptionKeyTimes > 0; ++ } ++ ++ boolean shouldRefetchToken() { ++ return fetchTokenTimes > 0; ++ } ++ } ++ ++ /** Used to indicate the buffered data's range in the block group */ ++ private static class StripeRange { ++ /** start offset in the block group (inclusive) */ ++ final long offsetInBlock; ++ /** length of the stripe range */ ++ final long length; ++ ++ StripeRange(long offsetInBlock, long length) { ++ Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0); ++ this.offsetInBlock = offsetInBlock; ++ this.length = length; ++ } ++ ++ boolean include(long pos) { ++ return pos >= offsetInBlock && pos < offsetInBlock + length; ++ } ++ } ++ ++ private static class BlockReaderInfo { ++ final BlockReader reader; ++ final DatanodeInfo datanode; ++ /** ++ * when initializing block readers, their starting offsets are set to the same ++ * number: the smallest internal block offsets among all the readers. This is ++ * because it is possible that for some internal blocks we have to read ++ * "backwards" for decoding purpose. We thus use this offset array to track ++ * offsets for all the block readers so that we can skip data if necessary. ++ */ ++ long blockReaderOffset; ++ /** ++ * We use this field to indicate whether we should use this reader. In case ++ * we hit any issue with this reader, we set this field to true and avoid ++ * using it for the next stripe. ++ */ ++ boolean shouldSkip = false; ++ ++ BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) { ++ this.reader = reader; ++ this.datanode = dn; ++ this.blockReaderOffset = offset; ++ } ++ ++ void setOffset(long offset) { ++ this.blockReaderOffset = offset; ++ } ++ ++ void skip() { ++ this.shouldSkip = true; ++ } ++ } ++ ++ private static final DirectBufferPool bufferPool = new DirectBufferPool(); ++ ++ private final BlockReaderInfo[] blockReaders; ++ private final int cellSize; ++ private final short dataBlkNum; ++ private final short parityBlkNum; ++ private final int groupSize; ++ /** the buffer for a complete stripe */ ++ private ByteBuffer curStripeBuf; ++ private ByteBuffer parityBuf; ++ private final ErasureCodingPolicy ecPolicy; ++ private final RawErasureDecoder decoder; ++ ++ /** ++ * indicate the start/end offset of the current buffered stripe in the ++ * block group ++ */ ++ private StripeRange curStripeRange; ++ private final CompletionService<Void> readingService; ++ ++ /** ++ * When warning the user of a lost block in striping mode, we remember the ++ * dead nodes we've logged. All other striping blocks on these nodes can be ++ * considered lost too, and we don't want to log a warning for each of them. ++ * This is to prevent the log from being too verbose. Refer to HDFS-8920. ++ * ++ * To minimize the overhead, we only store the datanodeUuid in this set ++ */ ++ private final Set<String> warnedNodes = Collections.newSetFromMap( ++ new ConcurrentHashMap<String, Boolean>()); ++ ++ DFSStripedInputStream(DFSClient dfsClient, String src, ++ boolean verifyChecksum, ErasureCodingPolicy ecPolicy, ++ LocatedBlocks locatedBlocks) throws IOException { ++ super(dfsClient, src, verifyChecksum, locatedBlocks); ++ ++ assert ecPolicy != null; ++ this.ecPolicy = ecPolicy; ++ this.cellSize = ecPolicy.getCellSize(); ++ dataBlkNum = (short) ecPolicy.getNumDataUnits(); ++ parityBlkNum = (short) ecPolicy.getNumParityUnits(); ++ groupSize = dataBlkNum + parityBlkNum; ++ blockReaders = new BlockReaderInfo[groupSize]; ++ curStripeRange = new StripeRange(0, 0); ++ readingService = ++ new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); ++ decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(), ++ dataBlkNum, parityBlkNum); ++ if (DFSClient.LOG.isDebugEnabled()) { ++ DFSClient.LOG.debug("Creating an striped input stream for file " + src); ++ } ++ } ++ ++ private void resetCurStripeBuffer() { ++ if (curStripeBuf == null) { ++ curStripeBuf = bufferPool.getBuffer(cellSize * dataBlkNum); ++ } ++ curStripeBuf.clear(); ++ curStripeRange = new StripeRange(0, 0); ++ } ++ ++ private ByteBuffer getParityBuffer() { ++ if (parityBuf == null) { ++ parityBuf = bufferPool.getBuffer(cellSize * parityBlkNum); ++ } ++ parityBuf.clear(); ++ return parityBuf; ++ } ++ ++ /** ++ * When seeking into a new block group, create blockReader for each internal ++ * block in the group. ++ */ ++ private synchronized void blockSeekTo(long target) throws IOException { ++ if (target >= getFileLength()) { ++ throw new IOException("Attempted to read past end of file"); ++ } ++ ++ // Will be getting a new BlockReader. ++ closeCurrentBlockReaders(); ++ ++ // Compute desired striped block group ++ LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target); ++ // Update current position ++ this.pos = target; ++ this.blockEnd = targetBlockGroup.getStartOffset() + ++ targetBlockGroup.getBlockSize() - 1; ++ currentLocatedBlock = targetBlockGroup; ++ } ++ ++ @Override ++ public synchronized void close() throws IOException { ++ super.close(); ++ if (curStripeBuf != null) { ++ bufferPool.returnBuffer(curStripeBuf); ++ curStripeBuf = null; ++ } ++ if (parityBuf != null) { ++ bufferPool.returnBuffer(parityBuf); ++ parityBuf = null; ++ } ++ } ++ ++ /** ++ * Extend the super method with the logic of switching between cells. ++ * When reaching the end of a cell, proceed to the next cell and read it ++ * with the next blockReader. ++ */ ++ @Override ++ protected void closeCurrentBlockReaders() { ++ resetCurStripeBuffer(); ++ if (blockReaders == null || blockReaders.length == 0) { ++ return; ++ } ++ for (int i = 0; i < groupSize; i++) { ++ closeReader(blockReaders[i]); ++ blockReaders[i] = null; ++ } ++ blockEnd = -1; ++ } ++ ++ private void closeReader(BlockReaderInfo readerInfo) { ++ if (readerInfo != null) { ++// IOUtils.cleanup(null, readerInfo.reader); ++ readerInfo.skip(); ++ } ++ } ++ ++ private long getOffsetInBlockGroup() { ++ return getOffsetInBlockGroup(pos); ++ } ++ ++ private long getOffsetInBlockGroup(long pos) { ++ return pos - currentLocatedBlock.getStartOffset(); ++ } ++ ++ /** ++ * Read a new stripe covering the current position, and store the data in the ++ * {@link #curStripeBuf}. ++ */ ++ private void readOneStripe( ++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) ++ throws IOException { ++ resetCurStripeBuffer(); ++ ++ // compute stripe range based on pos ++ final long offsetInBlockGroup = getOffsetInBlockGroup(); ++ final long stripeLen = cellSize * dataBlkNum; ++ final int stripeIndex = (int) (offsetInBlockGroup / stripeLen); ++ final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen); ++ final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize() ++ - (stripeIndex * stripeLen), stripeLen); ++ StripeRange stripeRange = new StripeRange(offsetInBlockGroup, ++ stripeLimit - stripeBufOffset); ++ ++ LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock; ++ AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy, cellSize, ++ blockGroup, offsetInBlockGroup, ++ offsetInBlockGroup + stripeRange.length - 1, curStripeBuf); ++ final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( ++ blockGroup, cellSize, dataBlkNum, parityBlkNum); ++ // read the whole stripe ++ for (AlignedStripe stripe : stripes) { ++ // Parse group to get chosen DN location ++ StripeReader sreader = new StatefulStripeReader(readingService, stripe, ++ blks, blockReaders, corruptedBlockMap); ++ sreader.readStripe(); ++ } ++ curStripeBuf.position(stripeBufOffset); ++ curStripeBuf.limit(stripeLimit); ++ curStripeRange = stripeRange; ++ } ++ ++ private Callable<Void> readCells(final BlockReader reader, ++ final DatanodeInfo datanode, final long currentReaderOffset, ++ final long targetReaderOffset, final ByteBufferStrategy[] strategies, ++ final ExtendedBlock currentBlock, ++ final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { ++ return new Callable<Void>() { ++ @Override ++ public Void call() throws Exception { ++ // reader can be null if getBlockReaderWithRetry failed or ++ // the reader hit exception before ++ if (reader == null) { ++ throw new IOException("The BlockReader is null. " + ++ "The BlockReader creation failed or the reader hit exception."); ++ } ++ Preconditions.checkState(currentReaderOffset <= targetReaderOffset); ++ if (currentReaderOffset < targetReaderOffset) { ++ long skipped = reader.skip(targetReaderOffset - currentReaderOffset); ++ Preconditions.checkState( ++ skipped == targetReaderOffset - currentReaderOffset); ++ } ++ int result = 0; ++ for (ByteBufferStrategy strategy : strategies) { ++ result += readToBuffer(reader, datanode, strategy, currentBlock, ++ corruptedBlockMap); ++ } ++ return null; ++ } ++ }; ++ } ++ ++ private int readToBuffer(BlockReader blockReader, ++ DatanodeInfo currentNode, ByteBufferStrategy strategy, ++ ExtendedBlock currentBlock, ++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) ++ throws IOException { ++ final int targetLength = strategy.buf.remaining(); ++ int length = 0; ++ try { ++ while (length < targetLength) { ++ int ret = strategy.doRead(blockReader, 0, 0); ++ if (ret < 0) { ++ throw new IOException("Unexpected EOS from the reader"); ++ } ++ length += ret; ++ } ++ return length; ++ } catch (ChecksumException ce) { ++ DFSClient.LOG.warn("Found Checksum error for " ++ + currentBlock + " from " + currentNode ++ + " at " + ce.getPos()); ++ // we want to remember which block replicas we have tried ++ addIntoCorruptedBlockMap(currentBlock, currentNode, ++ corruptedBlockMap); ++ throw ce; ++ } catch (IOException e) { ++ DFSClient.LOG.warn("Exception while reading from " ++ + currentBlock + " of " + src + " from " ++ + currentNode, e); ++ throw e; ++ } ++ } ++ ++ /** ++ * Seek to a new arbitrary location ++ */ ++ @Override ++ public synchronized void seek(long targetPos) throws IOException { ++ if (targetPos > getFileLength()) { ++ throw new EOFException("Cannot seek after EOF"); ++ } ++ if (targetPos < 0) { ++ throw new EOFException("Cannot seek to negative offset"); ++ } ++ if (closed.get()) { ++ throw new IOException("Stream is closed!"); ++ } ++ if (targetPos <= blockEnd) { ++ final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos); ++ if (curStripeRange.include(targetOffsetInBlk)) { ++ int bufOffset = getStripedBufOffset(targetOffsetInBlk); ++ curStripeBuf.position(bufOffset); ++ pos = targetPos; ++ return; ++ } ++ } ++ pos = targetPos; ++ blockEnd = -1; ++ } ++ ++ private int getStripedBufOffset(long offsetInBlockGroup) { ++ final long stripeLen = cellSize * dataBlkNum; ++ // compute the position in the curStripeBuf based on "pos" ++ return (int) (offsetInBlockGroup % stripeLen); ++ } ++ ++ @Override ++ public synchronized boolean seekToNewSource(long targetPos) ++ throws IOException { ++ return false; ++ } ++ ++ @Override ++ protected synchronized int readWithStrategy(ReaderStrategy strategy, ++ int off, int len) throws IOException { ++ dfsClient.checkOpen(); ++ if (closed.get()) { ++ throw new IOException("Stream closed"); ++ } ++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap = ++ new ConcurrentHashMap<>(); ++ if (pos < getFileLength()) { ++ try { ++ if (pos > blockEnd) { ++ blockSeekTo(pos); ++ } ++ int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); ++ synchronized (infoLock) { ++ if (locatedBlocks.isLastBlockComplete()) { ++ realLen = (int) Math.min(realLen, ++ locatedBlocks.getFileLength() - pos); ++ } ++ } ++ ++ /** Number of bytes already read into buffer */ ++ int result = 0; ++ while (result < realLen) { ++ if (!curStripeRange.include(getOffsetInBlockGroup())) { ++ readOneStripe(corruptedBlockMap); ++ } ++ int ret = copyToTargetBuf(strategy, off + result, realLen - result); ++ result += ret; ++ pos += ret; ++ } ++ if (dfsClient.stats != null) { ++ dfsClient.stats.incrementBytesRead(result); ++ } ++ return result; ++ } finally { ++ // Check if need to report block replicas corruption either read ++ // was successful or ChecksumException occured. ++ reportCheckSumFailure(corruptedBlockMap, ++ currentLocatedBlock.getLocations().length); ++ } ++ } ++ return -1; ++ } ++ ++ /** ++ * Copy the data from {@link #curStripeBuf} into the given buffer ++ * @param strategy the ReaderStrategy containing the given buffer ++ * @param offset the offset of the given buffer. Used only when strategy is ++ * a ByteArrayStrategy ++ * @param length target length ++ * @return number of bytes copied ++ */ ++ private int copyToTargetBuf(ReaderStrategy strategy, int offset, int length) { ++ final long offsetInBlk = getOffsetInBlockGroup(); ++ int bufOffset = getStripedBufOffset(offsetInBlk); ++ curStripeBuf.position(bufOffset); ++ return strategy.copyFrom(curStripeBuf, offset, ++ Math.min(length, curStripeBuf.remaining())); ++ } ++ ++ /** ++ * The super method {@link DFSInputStream#refreshLocatedBlock} refreshes ++ * cached LocatedBlock by executing {@link DFSInputStream#getBlockAt} again. ++ * This method extends the logic by first remembering the index of the ++ * internal block, and re-parsing the refreshed block group with the same ++ * index. ++ */ ++ @Override ++ protected LocatedBlock refreshLocatedBlock(LocatedBlock block) ++ throws IOException { ++ int idx = StripedBlockUtil.getBlockIndex(block.getBlock().getLocalBlock()); ++ LocatedBlock lb = getBlockGroupAt(block.getStartOffset()); ++ // If indexing information is returned, iterate through the index array ++ // to find the entry for position idx in the group ++ LocatedStripedBlock lsb = (LocatedStripedBlock) lb; ++ int i = 0; ++ for (; i < lsb.getBlockIndices().length; i++) { ++ if (lsb.getBlockIndices()[i] == idx) { ++ break; ++ } ++ } ++ if (DFSClient.LOG.isDebugEnabled()) { ++ DFSClient.LOG.debug("refreshLocatedBlock for striped blocks, offset=" ++ + block.getStartOffset() + ". Obtained block " + lb + ", idx=" + idx); ++ } ++ return StripedBlockUtil.constructInternalBlock( ++ lsb, i, cellSize, dataBlkNum, idx); ++ } ++ ++ private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException { ++ LocatedBlock lb = super.getBlockAt(offset); ++ assert lb instanceof LocatedStripedBlock : "NameNode" + ++ " should return a LocatedStripedBlock for a striped file"; ++ return (LocatedStripedBlock)lb; ++ } ++ ++ /** ++ * Real implementation of pread. ++ */ ++ @Override ++ protected void fetchBlockByteRange(LocatedBlock block, long start, ++ long end, byte[] buf, int offset, ++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) ++ throws IOException { ++ // Refresh the striped block group ++ LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset()); ++ ++ AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes( ++ ecPolicy, cellSize, blockGroup, start, end, buf, offset); ++ CompletionService<Void> readService = new ExecutorCompletionService<>( ++ dfsClient.getStripedReadsThreadPool()); ++ final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( ++ blockGroup, cellSize, dataBlkNum, parityBlkNum); ++ final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize]; ++ try { ++ for (AlignedStripe stripe : stripes) { ++ // Parse group to get chosen DN location ++ StripeReader preader = new PositionStripeReader(readService, stripe, ++ blks, preaderInfos, corruptedBlockMap); ++ preader.readStripe(); ++ } ++ } finally { ++ for (BlockReaderInfo preaderInfo : preaderInfos) { ++ closeReader(preaderInfo); ++ } ++ } ++ } ++ ++ @Override ++ protected void reportLostBlock(LocatedBlock lostBlock, ++ Collection<DatanodeInfo> ignoredNodes) { ++ DatanodeInfo[] nodes = lostBlock.getLocations(); ++ if (nodes != null && nodes.length > 0) { ++ List<String> dnUUIDs = new ArrayList<>(); ++ for (DatanodeInfo node : nodes) { ++ dnUUIDs.add(node.getDatanodeUuid()); ++ } ++ if (!warnedNodes.containsAll(dnUUIDs)) { ++ DFSClient.LOG.warn(Arrays.toString(nodes) + " are unavailable and " + ++ "all striping blocks on them are lost. " + ++ "IgnoredNodes = " + ignoredNodes); ++ warnedNodes.addAll(dnUUIDs); ++ } ++ } else { ++ super.reportLostBlock(lostBlock, ignoredNodes); ++ } ++ } ++ ++ /** ++ * The reader for reading a complete {@link AlignedStripe}. Note that an ++ * {@link AlignedStripe} may cross multiple stripes with cellSize width. ++ */ ++ private abstract class StripeReader { ++ final Map<Future<Void>, Integer> futures = new HashMap<>(); ++ final AlignedStripe alignedStripe; ++ final CompletionService<Void> service; ++ final LocatedBlock[] targetBlocks; ++ final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap; ++ final BlockReaderInfo[] readerInfos; ++ ++ StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe, ++ LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos, ++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { ++ this.service = service; ++ this.alignedStripe = alignedStripe; ++ this.targetBlocks = targetBlocks; ++ this.readerInfos = readerInfos; ++ this.corruptedBlockMap = corruptedBlockMap; ++ } ++ ++ /** prepare all the data chunks */ ++ abstract void prepareDecodeInputs(); ++ ++ /** prepare the parity chunk and block reader if necessary */ ++ abstract boolean prepareParityChunk(int index) throws IOException; ++ ++ abstract void decode(); ++ ++ void updateState4SuccessRead(StripingChunkReadResult result) { ++ Preconditions.checkArgument( ++ result.state == StripingChunkReadResult.SUCCESSFUL); ++ readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock() ++ + alignedStripe.getSpanInBlock()); ++ } ++ ++ private void checkMissingBlocks() throws IOException { ++ if (alignedStripe.missingChunksNum > parityBlkNum) { ++ clearFutures(futures.keySet()); ++ throw new IOException(alignedStripe.missingChunksNum ++ + " missing blocks, the stripe is: " + alignedStripe); ++ } ++ } ++ ++ /** ++ * We need decoding. Thus go through all the data chunks and make sure we ++ * submit read requests for all of them. ++ */ ++ private void readDataForDecoding() throws IOException { ++ prepareDecodeInputs(); ++ for (int i = 0; i < dataBlkNum; i++) { ++ Preconditions.checkNotNull(alignedStripe.chunks[i]); ++ if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) { ++ if (!readChunk(targetBlocks[i], i)) { ++ alignedStripe.missingChunksNum++; ++ } ++ } ++ } ++ checkMissingBlocks(); ++ } ++ ++ void readParityChunks(int num) throws IOException { ++ for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num; ++ i++) { ++ if (alignedStripe.chunks[i] == null) { ++ if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) { ++ j++; ++ } else { ++ alignedStripe.missingChunksNum++; ++ } ++ } ++ } ++ checkMissingBlocks(); ++ } ++ ++ boolean createBlockReader(LocatedBlock block, int chunkIndex) ++ throws IOException { ++ BlockReader reader = null; ++ final ReaderRetryPolicy retry = new ReaderRetryPolicy(); ++ DNAddrPair dnInfo = new DNAddrPair(null, null, null); ++ ++ while(true) { ++ try { ++ // the cached block location might have been re-fetched, so always ++ // get it from cache. ++ block = refreshLocatedBlock(block); ++ targetBlocks[chunkIndex] = block; ++ ++ // internal block has one location, just rule out the deadNodes ++ dnInfo = getBestNodeDNAddrPair(block, null); ++ if (dnInfo == null) { ++ break; ++ } ++ reader = getBlockReader(block, alignedStripe.getOffsetInBlock(), ++ block.getBlockSize() - alignedStripe.getOffsetInBlock(), ++ dnInfo.addr, dnInfo.storageType, dnInfo.info); ++ } catch (IOException e) { ++ if (e instanceof InvalidEncryptionKeyException && ++ retry.shouldRefetchEncryptionKey()) { ++ DFSClient.LOG.info("Will fetch a new encryption key and retry, " ++ + "encryption key was invalid when connecting to " + dnInfo.addr ++ + " : " + e); ++ dfsClient.clearDataEncryptionKey(); ++ retry.refetchEncryptionKey(); ++ } else if (retry.shouldRefetchToken() && ++ tokenRefetchNeeded(e, dnInfo.addr)) { ++ fetchBlockAt(block.getStartOffset()); ++ retry.refetchToken(); ++ } else { ++ //TODO: handles connection issues ++ DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " + ++ "block" + block.getBlock(), e); ++ // re-fetch the block in case the block has been moved ++ fetchBlockAt(block.getStartOffset()); ++ addToDeadNodes(dnInfo.info); ++ } ++ } ++ if (reader != null) { ++ readerInfos[chunkIndex] = new BlockReaderInfo(reader, dnInfo.info, ++ alignedStripe.getOffsetInBlock()); ++ return true; ++ } ++ } ++ return false; ++ } ++ ++ private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) { ++ if (chunk.byteBuffer != null) { ++ ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer); ++ return new ByteBufferStrategy[]{strategy}; ++ } else { ++ ByteBufferStrategy[] strategies = ++ new ByteBufferStrategy[chunk.byteArray.getOffsets().length]; ++ for (int i = 0; i < strategies.length; i++) { ++ ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(), ++ chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]); ++ strategies[i] = new ByteBufferStrategy(buffer); ++ } ++ return strategies; ++ } ++ } ++ ++ boolean readChunk(final LocatedBlock block, int chunkIndex) ++ throws IOException { ++ final StripingChunk chunk = alignedStripe.chunks[chunkIndex]; ++ if (block == null) { ++ chunk.state = StripingChunk.MISSING; ++ return false; ++ } ++ if (readerInfos[chunkIndex] == null) { ++ if (!createBlockReader(block, chunkIndex)) { ++ chunk.state = StripingChunk.MISSING; ++ return false; ++ } ++ } else if (readerInfos[chunkIndex].shouldSkip) { ++ chunk.state = StripingChunk.MISSING; ++ return false; ++ } ++ ++ chunk.state = StripingChunk.PENDING; ++ Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader, ++ readerInfos[chunkIndex].datanode, ++ readerInfos[chunkIndex].blockReaderOffset, ++ alignedStripe.getOffsetInBlock(), getReadStrategies(chunk), ++ block.getBlock(), corruptedBlockMap); ++ ++ Future<Void> request = service.submit(readCallable); ++ futures.put(request, chunkIndex); ++ return true; ++ } ++ ++ /** read the whole stripe. do decoding if necessary */ ++ void readStripe() throws IOException { ++ for (int i = 0; i < dataBlkNum; i++) { ++ if (alignedStripe.chunks[i] != null && ++ alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { ++ if (!readChunk(targetBlocks[i], i)) { ++ alignedStripe.missingChunksNum++; ++ } ++ } ++ } ++ // There are missing block locations at this stage. Thus we need to read ++ // the full stripe and one more parity block. ++ if (alignedStripe.missingChunksNum > 0) { ++ checkMissingBlocks(); ++ readDataForDecoding(); ++ // read parity chunks ++ readParityChunks(alignedStripe.missingChunksNum); ++ } ++ // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks ++ ++ // Input buffers for potential decode operation, which remains null until ++ // first read failure ++ while (!futures.isEmpty()) { ++ try { ++ StripingChunkReadResult r = StripedBlockUtil ++ .getNextCompletedStripedRead(service, futures, 0); ++ if (DFSClient.LOG.isDebugEnabled()) { ++ DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " ++ + alignedStripe); ++ } ++ StripingChunk returnedChunk = alignedStripe.chunks[r.index]; ++ Preconditions.checkNotNull(returnedChunk); ++ Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING); ++ ++ if (r.state == StripingChunkReadResult.SUCCESSFUL) { ++ returnedChunk.state = StripingChunk.FETCHED; ++ alignedStripe.fetchedChunksNum++; ++ updateState4SuccessRead(r); ++ if (alignedStripe.fetchedChunksNum == dataBlkNum) { ++ clearFutures(futures.keySet()); ++ break; ++ } ++ } else { ++ returnedChunk.state = StripingChunk.MISSING; ++ // close the corresponding reader ++ closeReader(readerInfos[r.index]); ++ ++ final int missing = alignedStripe.missingChunksNum; ++ alignedStripe.missingChunksNum++; ++ checkMissingBlocks(); ++ ++ readDataForDecoding(); ++ readParityChunks(alignedStripe.missingChunksNum - missing); ++ } ++ } catch (InterruptedException ie) { ++ String err = "Read request interrupted"; ++ DFSClient.LOG.error(err); ++ clearFutures(futures.keySet()); ++ // Don't decode if read interrupted ++ throw new InterruptedIOException(err); ++ } ++ } ++ ++ if (alignedStripe.missingChunksNum > 0) { ++ decode(); ++ } ++ } ++ } ++ ++ class PositionStripeReader extends StripeReader { ++ private byte[][] decodeInputs = null; ++ ++ PositionStripeReader(CompletionService<Void> service, ++ AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, ++ BlockReaderInfo[] readerInfos, ++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { ++ super(service, alignedStripe, targetBlocks, readerInfos, ++ corruptedBlockMap); ++ } ++ ++ @Override ++ void prepareDecodeInputs() { ++ if (decodeInputs == null) { ++ decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe, ++ dataBlkNum, parityBlkNum); ++ } ++ } ++ ++ @Override ++ boolean prepareParityChunk(int index) { ++ Preconditions.checkState(index >= dataBlkNum && ++ alignedStripe.chunks[index] == null); ++ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index, ++ dataBlkNum, parityBlkNum); ++ alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]); ++ alignedStripe.chunks[index].addByteArraySlice(0, ++ (int) alignedStripe.getSpanInBlock()); ++ return true; ++ } ++ ++ @Override ++ void decode() { ++ StripedBlockUtil.finalizeDecodeInputs(decodeInputs, dataBlkNum, ++ parityBlkNum, alignedStripe); ++ StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe, ++ dataBlkNum, parityBlkNum, decoder); ++ } ++ } ++ ++ class StatefulStripeReader extends StripeReader { ++ ByteBuffer[] decodeInputs; ++ ++ StatefulStripeReader(CompletionService<Void> service, ++ AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, ++ BlockReaderInfo[] readerInfos, ++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { ++ super(service, alignedStripe, targetBlocks, readerInfos, ++ corruptedBlockMap); ++ } ++ ++ @Override ++ void prepareDecodeInputs() { ++ if (decodeInputs == null) { ++ decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum]; ++ final ByteBuffer cur; ++ synchronized (DFSStripedInputStream.this) { ++ cur = curStripeBuf.duplicate(); ++ } ++ StripedBlockUtil.VerticalRange range = alignedStripe.range; ++ for (int i = 0; i < dataBlkNum; i++) { ++ cur.limit(cur.capacity()); ++ int pos = (int) (range.offsetInBlock % cellSize + cellSize * i); ++ cur.position(pos); ++ cur.limit((int) (pos + range.spanInBlock)); ++ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i, ++ dataBlkNum, parityBlkNum); ++ decodeInputs[decodeIndex] = cur.slice(); ++ if (alignedStripe.chunks[i] == null) { ++ alignedStripe.chunks[i] = new StripingChunk( ++ decodeInputs[decodeIndex]); ++ } ++ } ++ } ++ } ++ ++ @Override ++ boolean prepareParityChunk(int index) throws IOException { ++ Preconditions.checkState(index >= dataBlkNum ++ && alignedStripe.chunks[index] == null); ++ if (blockReaders[index] != null && blockReaders[index].shouldSkip) { ++ alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING); ++ // we have failed the block reader before ++ return false; ++ } ++ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index, ++ dataBlkNum, parityBlkNum); ++ ByteBuffer buf = getParityBuffer().duplicate(); ++ buf.position(cellSize * decodeIndex); ++ buf.limit(cellSize * decodeIndex + (int) alignedStripe.range.spanInBlock); ++ decodeInputs[decodeIndex] = buf.slice(); ++ alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]); ++ return true; ++ } ++ ++ @Override ++ void decode() { ++ // TODO no copy for data chunks. this depends on HADOOP-12047 ++ final int span = (int) alignedStripe.getSpanInBlock(); ++ for (int i = 0; i < alignedStripe.chunks.length; i++) { ++ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i, ++ dataBlkNum, parityBlkNum); ++ if (alignedStripe.chunks[i] != null && ++ alignedStripe.chunks[i].state == StripingChunk.ALLZERO) { ++ for (int j = 0; j < span; j++) { ++ decodeInputs[decodeIndex].put((byte) 0); ++ } ++ decodeInputs[decodeIndex].flip(); ++ } else if (alignedStripe.chunks[i] != null && ++ alignedStripe.chunks[i].state == StripingChunk.FETCHED) { ++ decodeInputs[decodeIndex].position(0); ++ decodeInputs[decodeIndex].limit(span); ++ } ++ } ++ int[] decodeIndices = new int[parityBlkNum]; ++ int pos = 0; ++ for (int i = 0; i < alignedStripe.chunks.length; i++) { ++ if (alignedStripe.chunks[i] != null && ++ alignedStripe.chunks[i].state == StripingChunk.MISSING) { ++ int decodeIndex = StripedBlockUtil.convertIndex4Decode(i, ++ dataBlkNum, parityBlkNum); ++ if (i < dataBlkNum) { ++ decodeIndices[pos++] = decodeIndex; ++ } else { ++ decodeInputs[decodeIndex] = null; ++ } ++ } ++ } ++ decodeIndices = Arrays.copyOf(decodeIndices, pos); ++ ++ final int decodeChunkNum = decodeIndices.length; ++ ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum]; ++ for (int i = 0; i < decodeChunkNum; i++) { ++ outputs[i] = decodeInputs[decodeIndices[i]]; ++ outputs[i].position(0); ++ outputs[i].limit((int) alignedStripe.range.spanInBlock); ++ decodeInputs[decodeIndices[i]] = null; ++ } ++ ++ decoder.decode(decodeInputs, decodeIndices, outputs); ++ } ++ } ++ ++ /** ++ * May need online read recovery, zero-copy read doesn't make ++ * sense, so don't support it. ++ */ ++ @Override ++ public synchronized ByteBuffer read(ByteBufferPool bufferPool, ++ int maxLength, EnumSet<ReadOption> opts) ++ throws IOException, UnsupportedOperationException { ++ throw new UnsupportedOperationException( ++ "Not support enhanced byte buffer access."); ++ } ++ ++ @Override ++ public synchronized void releaseBuffer(ByteBuffer buffer) { ++ throw new UnsupportedOperationException( ++ "Not support enhanced byte buffer access."); ++ } ++ ++ /** A variation to {@link DFSInputStream#cancelAll} */ ++ private void clearFutures(Collection<Future<Void>> futures) { ++ for (Future<Void> future : futures) { ++ future.cancel(false); ++ } ++ futures.clear(); ++ } ++}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 0000000,0000000..bf4e10e new file mode 100644 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@@ -1,0 -1,0 +1,953 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hdfs; ++ ++import java.io.IOException; ++import java.io.InterruptedIOException; ++import java.nio.ByteBuffer; ++import java.nio.channels.ClosedChannelException; ++import java.util.ArrayList; ++import java.util.Arrays; ++import java.util.Collections; ++import java.util.EnumSet; ++import java.util.HashMap; ++import java.util.HashSet; ++import java.util.List; ++import java.util.Map; ++import java.util.Set; ++import java.util.concurrent.BlockingQueue; ++import java.util.concurrent.LinkedBlockingQueue; ++import java.util.concurrent.TimeUnit; ++ ++import org.apache.hadoop.HadoopIllegalArgumentException; ++import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.fs.CreateFlag; ++import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; ++import org.apache.hadoop.hdfs.protocol.ClientProtocol; ++import org.apache.hadoop.hdfs.protocol.DatanodeID; ++import org.apache.hadoop.hdfs.protocol.DatanodeInfo; ++import org.apache.hadoop.hdfs.protocol.ExtendedBlock; ++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; ++import org.apache.hadoop.hdfs.protocol.LocatedBlock; ++import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; ++import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; ++import org.apache.hadoop.hdfs.util.StripedBlockUtil; ++import org.apache.hadoop.io.MultipleIOException; ++import org.apache.hadoop.io.erasurecode.CodecUtil; ++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; ++import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; ++import org.apache.hadoop.util.DataChecksum; ++import org.apache.hadoop.util.Progressable; ++import org.apache.hadoop.util.Time; ++ ++import com.google.common.base.Preconditions; ++import org.apache.htrace.core.TraceScope; ++ ++ ++/** ++ * This class supports writing files in striped layout and erasure coded format. ++ * Each stripe contains a sequence of cells. ++ */ [email protected] ++public class DFSStripedOutputStream extends DFSOutputStream { ++ static class MultipleBlockingQueue<T> { ++ private final List<BlockingQueue<T>> queues; ++ ++ MultipleBlockingQueue(int numQueue, int queueSize) { ++ List<BlockingQueue<T>> list = new ArrayList<>(numQueue); ++ for (int i = 0; i < numQueue; i++) { ++ list.add(new LinkedBlockingQueue<T>(queueSize)); ++ } ++ queues = Collections.synchronizedList(list); ++ } ++ ++ void offer(int i, T object) { ++ final boolean b = queues.get(i).offer(object); ++ Preconditions.checkState(b, "Failed to offer " + object ++ + " to queue, i=" + i); ++ } ++ ++ T take(int i) throws InterruptedIOException { ++ try { ++ return queues.get(i).take(); ++ } catch(InterruptedException ie) { ++ throw DFSUtilClient.toInterruptedIOException("take interrupted, i=" + i, ie); ++ } ++ } ++ ++ T takeWithTimeout(int i) throws InterruptedIOException { ++ try { ++ return queues.get(i).poll(100, TimeUnit.MILLISECONDS); ++ } catch (InterruptedException e) { ++ throw DFSUtilClient.toInterruptedIOException("take interrupted, i=" + i, e); ++ } ++ } ++ ++ T poll(int i) { ++ return queues.get(i).poll(); ++ } ++ ++ T peek(int i) { ++ return queues.get(i).peek(); ++ } ++ ++ void clear() { ++ for (BlockingQueue<T> q : queues) { ++ q.clear(); ++ } ++ } ++ } ++ ++ /** Coordinate the communication between the streamers. */ ++ static class Coordinator { ++ /** ++ * The next internal block to write to for each streamers. The ++ * DFSStripedOutputStream makes the {@link ClientProtocol#addBlock} RPC to ++ * get a new block group. The block group is split to internal blocks, which ++ * are then distributed into the queue for streamers to retrieve. ++ */ ++ private final MultipleBlockingQueue<LocatedBlock> followingBlocks; ++ /** ++ * Used to sync among all the streamers before allocating a new block. The ++ * DFSStripedOutputStream uses this to make sure every streamer has finished ++ * writing the previous block. ++ */ ++ private final MultipleBlockingQueue<ExtendedBlock> endBlocks; ++ ++ /** ++ * The following data structures are used for syncing while handling errors ++ */ ++ private final MultipleBlockingQueue<LocatedBlock> newBlocks; ++ private final Map<StripedDataStreamer, Boolean> updateStreamerMap; ++ private final MultipleBlockingQueue<Boolean> streamerUpdateResult; ++ ++ Coordinator(final int numAllBlocks) { ++ followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); ++ endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); ++ newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); ++ updateStreamerMap = Collections.synchronizedMap( ++ new HashMap<StripedDataStreamer, Boolean>(numAllBlocks)); ++ streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1); ++ } ++ ++ MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() { ++ return followingBlocks; ++ } ++ ++ MultipleBlockingQueue<LocatedBlock> getNewBlocks() { ++ return newBlocks; ++ } ++ ++ void offerEndBlock(int i, ExtendedBlock block) { ++ endBlocks.offer(i, block); ++ } ++ ++ void offerStreamerUpdateResult(int i, boolean success) { ++ streamerUpdateResult.offer(i, success); ++ } ++ ++ boolean takeStreamerUpdateResult(int i) throws InterruptedIOException { ++ return streamerUpdateResult.take(i); ++ } ++ ++ void updateStreamer(StripedDataStreamer streamer, ++ boolean success) { ++ assert !updateStreamerMap.containsKey(streamer); ++ updateStreamerMap.put(streamer, success); ++ } ++ ++ void clearFailureStates() { ++ newBlocks.clear(); ++ updateStreamerMap.clear(); ++ streamerUpdateResult.clear(); ++ } ++ } ++ ++ /** Buffers for writing the data and parity cells of a stripe. */ ++ class CellBuffers { ++ private final ByteBuffer[] buffers; ++ private final byte[][] checksumArrays; ++ ++ CellBuffers(int numParityBlocks) throws InterruptedException{ ++ if (cellSize % bytesPerChecksum != 0) { ++ throw new HadoopIllegalArgumentException("Invalid values: " ++ + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" ++ + bytesPerChecksum + ") must divide cell size (=" + cellSize + ")."); ++ } ++ ++ checksumArrays = new byte[numParityBlocks][]; ++ final int size = getChecksumSize() * (cellSize / bytesPerChecksum); ++ for (int i = 0; i < checksumArrays.length; i++) { ++ checksumArrays[i] = new byte[size]; ++ } ++ ++ buffers = new ByteBuffer[numAllBlocks]; ++ for (int i = 0; i < buffers.length; i++) { ++ buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize)); ++ } ++ } ++ ++ private ByteBuffer[] getBuffers() { ++ return buffers; ++ } ++ ++ byte[] getChecksumArray(int i) { ++ return checksumArrays[i - numDataBlocks]; ++ } ++ ++ private int addTo(int i, byte[] b, int off, int len) { ++ final ByteBuffer buf = buffers[i]; ++ final int pos = buf.position() + len; ++ Preconditions.checkState(pos <= cellSize); ++ buf.put(b, off, len); ++ return pos; ++ } ++ ++ private void clear() { ++ for (int i = 0; i< numAllBlocks; i++) { ++ buffers[i].clear(); ++ if (i >= numDataBlocks) { ++ Arrays.fill(buffers[i].array(), (byte) 0); ++ } ++ } ++ } ++ ++ private void release() { ++ for (int i = 0; i < numAllBlocks; i++) { ++ byteArrayManager.release(buffers[i].array()); ++ } ++ } ++ ++ private void flipDataBuffers() { ++ for (int i = 0; i < numDataBlocks; i++) { ++ buffers[i].flip(); ++ } ++ } ++ } ++ ++ private final Coordinator coordinator; ++ private final CellBuffers cellBuffers; ++ private final RawErasureEncoder encoder; ++ private final List<StripedDataStreamer> streamers; ++ private final DFSPacket[] currentPackets; // current Packet of each streamer ++ ++ /** Size of each striping cell, must be a multiple of bytesPerChecksum */ ++ private final int cellSize; ++ private final int numAllBlocks; ++ private final int numDataBlocks; ++ private ExtendedBlock currentBlockGroup; ++ private final String[] favoredNodes; ++ private final List<StripedDataStreamer> failedStreamers; ++ ++ /** Construct a new output stream for creating a file. */ ++ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, ++ EnumSet<CreateFlag> flag, Progressable progress, ++ DataChecksum checksum, String[] favoredNodes) ++ throws IOException { ++ super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false); ++ if (LOG.isDebugEnabled()) { ++ LOG.debug("Creating DFSStripedOutputStream for " + src); ++ } ++ ++ final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy(); ++ final int numParityBlocks = ecPolicy.getNumParityUnits(); ++ cellSize = ecPolicy.getCellSize(); ++ numDataBlocks = ecPolicy.getNumDataUnits(); ++ numAllBlocks = numDataBlocks + numParityBlocks; ++ this.favoredNodes = favoredNodes; ++ failedStreamers = new ArrayList<>(); ++ ++ encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(), ++ numDataBlocks, numParityBlocks); ++ ++ coordinator = new Coordinator(numAllBlocks); ++ try { ++ cellBuffers = new CellBuffers(numParityBlocks); ++ } catch (InterruptedException ie) { ++ throw DFSUtilClient.toInterruptedIOException( ++ "Failed to create cell buffers", ie); ++ } ++ ++ streamers = new ArrayList<>(numAllBlocks); ++ for (short i = 0; i < numAllBlocks; i++) { ++ StripedDataStreamer streamer = new StripedDataStreamer(stat, ++ dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, ++ favoredNodes, i, coordinator); ++ streamers.add(streamer); ++ } ++ currentPackets = new DFSPacket[streamers.size()]; ++ setCurrentStreamer(0); ++ } ++ ++ StripedDataStreamer getStripedDataStreamer(int i) { ++ return streamers.get(i); ++ } ++ ++ int getCurrentIndex() { ++ return getCurrentStreamer().getIndex(); ++ } ++ ++ private synchronized StripedDataStreamer getCurrentStreamer() { ++ return (StripedDataStreamer) streamer; ++ } ++ ++ private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) { ++ // backup currentPacket for current streamer ++ if (streamer != null) { ++ int oldIdx = streamers.indexOf(getCurrentStreamer()); ++ if (oldIdx >= 0) { ++ currentPackets[oldIdx] = currentPacket; ++ } ++ } ++ ++ streamer = getStripedDataStreamer(newIdx); ++ currentPacket = currentPackets[newIdx]; ++ adjustChunkBoundary(); ++ ++ return getCurrentStreamer(); ++ } ++ ++ /** ++ * Encode the buffers, i.e. compute parities. ++ * ++ * @param buffers data buffers + parity buffers ++ */ ++ private static void encode(RawErasureEncoder encoder, int numData, ++ ByteBuffer[] buffers) { ++ final ByteBuffer[] dataBuffers = new ByteBuffer[numData]; ++ final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData]; ++ System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length); ++ System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length); ++ ++ encoder.encode(dataBuffers, parityBuffers); ++ } ++ ++ /** ++ * check all the existing StripedDataStreamer and find newly failed streamers. ++ * @return The newly failed streamers. ++ * @throws IOException if less than {@link #numDataBlocks} streamers are still ++ * healthy. ++ */ ++ private Set<StripedDataStreamer> checkStreamers() throws IOException { ++ Set<StripedDataStreamer> newFailed = new HashSet<>(); ++ for(StripedDataStreamer s : streamers) { ++ if (!s.isHealthy() && !failedStreamers.contains(s)) { ++ newFailed.add(s); ++ } ++ } ++ ++ final int failCount = failedStreamers.size() + newFailed.size(); ++ if (LOG.isDebugEnabled()) { ++ LOG.debug("checkStreamers: " + streamers); ++ LOG.debug("healthy streamer count=" + (numAllBlocks - failCount)); ++ LOG.debug("original failed streamers: " + failedStreamers); ++ LOG.debug("newly failed streamers: " + newFailed); ++ } ++ if (failCount > (numAllBlocks - numDataBlocks)) { ++ throw new IOException("Failed: the number of failed blocks = " ++ + failCount + " > the number of data blocks = " ++ + (numAllBlocks - numDataBlocks)); ++ } ++ return newFailed; ++ } ++ ++ private void handleStreamerFailure(String err, Exception e) ++ throws IOException { ++ LOG.warn("Failed: " + err + ", " + this, e); ++ getCurrentStreamer().getErrorState().setInternalError(); ++ getCurrentStreamer().close(true); ++ checkStreamers(); ++ currentPacket = null; ++ } ++ ++ private void replaceFailedStreamers() { ++ assert streamers.size() == numAllBlocks; ++ for (short i = 0; i < numAllBlocks; i++) { ++ final StripedDataStreamer oldStreamer = getStripedDataStreamer(i); ++ if (!oldStreamer.isHealthy()) { ++ StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat, ++ dfsClient, src, oldStreamer.progress, ++ oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager, ++ favoredNodes, i, coordinator); ++ streamers.set(i, streamer); ++ currentPackets[i] = null; ++ if (i == 0) { ++ this.streamer = streamer; ++ } ++ streamer.start(); ++ } ++ } ++ } ++ ++ private void waitEndBlocks(int i) throws IOException { ++ while (getStripedDataStreamer(i).isHealthy()) { ++ final ExtendedBlock b = coordinator.endBlocks.takeWithTimeout(i); ++ if (b != null) { ++ StripedBlockUtil.checkBlocks(currentBlockGroup, i, b); ++ return; ++ } ++ } ++ } ++ ++ private void allocateNewBlock() throws IOException { ++ if (currentBlockGroup != null) { ++ for (int i = 0; i < numAllBlocks; i++) { ++ // sync all the healthy streamers before writing to the new block ++ waitEndBlocks(i); ++ } ++ } ++ failedStreamers.clear(); ++ // replace failed streamers ++ replaceFailedStreamers(); ++ ++ if (LOG.isDebugEnabled()) { ++ LOG.debug("Allocating new block group. The previous block group: " ++ + currentBlockGroup); ++ } ++ ++ // TODO collect excludedNodes from all the data streamers ++ final LocatedBlock lb = addBlock(null, dfsClient, src, currentBlockGroup, ++ fileId, favoredNodes); ++ assert lb.isStriped(); ++ if (lb.getLocations().length < numDataBlocks) { ++ throw new IOException("Failed to get " + numDataBlocks ++ + " nodes from namenode: blockGroupSize= " + numAllBlocks ++ + ", blocks.length= " + lb.getLocations().length); ++ } ++ // assign the new block to the current block group ++ currentBlockGroup = lb.getBlock(); ++ ++ final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( ++ (LocatedStripedBlock) lb, cellSize, numDataBlocks, ++ numAllBlocks - numDataBlocks); ++ for (int i = 0; i < blocks.length; i++) { ++ StripedDataStreamer si = getStripedDataStreamer(i); ++ if (si.isHealthy()) { // skipping failed data streamer ++ if (blocks[i] == null) { ++ // Set exception and close streamer as there is no block locations ++ // found for the parity block. ++ LOG.warn("Failed to get block location for parity block, index=" + i); ++ si.getLastException().set( ++ new IOException("Failed to get following block, i=" + i)); ++ si.getErrorState().setInternalError(); ++ si.close(true); ++ } else { ++ coordinator.getFollowingBlocks().offer(i, blocks[i]); ++ } ++ } ++ } ++ } ++ ++ private boolean shouldEndBlockGroup() { ++ return currentBlockGroup != null && ++ currentBlockGroup.getNumBytes() == blockSize * numDataBlocks; ++ } ++ ++ @Override ++ protected synchronized void writeChunk(byte[] bytes, int offset, int len, ++ byte[] checksum, int ckoff, int cklen) throws IOException { ++ final int index = getCurrentIndex(); ++ final StripedDataStreamer current = getCurrentStreamer(); ++ final int pos = cellBuffers.addTo(index, bytes, offset, len); ++ final boolean cellFull = pos == cellSize; ++ ++ if (currentBlockGroup == null || shouldEndBlockGroup()) { ++ // the incoming data should belong to a new block. Allocate a new block. ++ allocateNewBlock(); ++ } ++ ++ currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len); ++ if (current.isHealthy()) { ++ try { ++ super.writeChunk(bytes, offset, len, checksum, ckoff, cklen); ++ } catch(Exception e) { ++ handleStreamerFailure("offset=" + offset + ", length=" + len, e); ++ } ++ } ++ ++ // Two extra steps are needed when a striping cell is full: ++ // 1. Forward the current index pointer ++ // 2. Generate parity packets if a full stripe of data cells are present ++ if (cellFull) { ++ int next = index + 1; ++ //When all data cells in a stripe are ready, we need to encode ++ //them and generate some parity cells. These cells will be ++ //converted to packets and put to their DataStreamer's queue. ++ if (next == numDataBlocks) { ++ cellBuffers.flipDataBuffers(); ++ writeParityCells(); ++ next = 0; ++ // check failure state for all the streamers. Bump GS if necessary ++ checkStreamerFailures(); ++ ++ // if this is the end of the block group, end each internal block ++ if (shouldEndBlockGroup()) { ++ for (int i = 0; i < numAllBlocks; i++) { ++ final StripedDataStreamer s = setCurrentStreamer(i); ++ if (s.isHealthy()) { ++ try { ++ endBlock(); ++ } catch (IOException ignored) {} ++ } ++ } ++ } ++ } ++ setCurrentStreamer(next); ++ } ++ } ++ ++ @Override ++ void enqueueCurrentPacketFull() throws IOException { ++ LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={}," ++ + " appendChunk={}, {}", currentPacket, src, getStreamer() ++ .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(), ++ getStreamer()); ++ enqueueCurrentPacket(); ++ adjustChunkBoundary(); ++ // no need to end block here ++ } ++ ++ private Set<StripedDataStreamer> markExternalErrorOnStreamers() { ++ Set<StripedDataStreamer> healthySet = new HashSet<>(); ++ for (StripedDataStreamer streamer : streamers) { ++ if (streamer.isHealthy() && ++ streamer.getStage() == BlockConstructionStage.DATA_STREAMING) { ++ streamer.setExternalError(); ++ healthySet.add(streamer); ++ } ++ } ++ return healthySet; ++ } ++ ++ /** ++ * Check and handle data streamer failures. This is called only when we have ++ * written a full stripe (i.e., enqueue all packets for a full stripe), or ++ * when we're closing the outputstream. ++ */ ++ private void checkStreamerFailures() throws IOException { ++ Set<StripedDataStreamer> newFailed = checkStreamers(); ++ if (newFailed.size() > 0) { ++ // for healthy streamers, wait till all of them have fetched the new block ++ // and flushed out all the enqueued packets. ++ flushAllInternals(); ++ } ++ // get all the current failed streamers after the flush ++ newFailed = checkStreamers(); ++ while (newFailed.size() > 0) { ++ failedStreamers.addAll(newFailed); ++ coordinator.clearFailureStates(); ++ ++ // mark all the healthy streamers as external error ++ Set<StripedDataStreamer> healthySet = markExternalErrorOnStreamers(); ++ ++ // we have newly failed streamers, update block for pipeline ++ final ExtendedBlock newBG = updateBlockForPipeline(healthySet); ++ ++ // wait till all the healthy streamers to ++ // 1) get the updated block info ++ // 2) create new block outputstream ++ newFailed = waitCreatingNewStreams(healthySet); ++ if (newFailed.size() + failedStreamers.size() > ++ numAllBlocks - numDataBlocks) { ++ throw new IOException( ++ "Data streamers failed while creating new block streams: " ++ + newFailed + ". There are not enough healthy streamers."); ++ } ++ for (StripedDataStreamer failedStreamer : newFailed) { ++ assert !failedStreamer.isHealthy(); ++ } ++ ++ // TODO we can also succeed if all the failed streamers have not taken ++ // the updated block ++ if (newFailed.size() == 0) { ++ // reset external error state of all the streamers ++ for (StripedDataStreamer streamer : healthySet) { ++ assert streamer.isHealthy(); ++ streamer.getErrorState().reset(); ++ } ++ updatePipeline(newBG); ++ } ++ for (int i = 0; i < numAllBlocks; i++) { ++ coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0); ++ } ++ } ++ } ++ ++ private int checkStreamerUpdates(Set<StripedDataStreamer> failed, ++ Set<StripedDataStreamer> streamers) { ++ for (StripedDataStreamer streamer : streamers) { ++ if (!coordinator.updateStreamerMap.containsKey(streamer)) { ++ if (!streamer.isHealthy() && ++ coordinator.getNewBlocks().peek(streamer.getIndex()) != null) { ++ // this streamer had internal error before getting updated block ++ failed.add(streamer); ++ } ++ } ++ } ++ return coordinator.updateStreamerMap.size() + failed.size(); ++ } ++ ++ private Set<StripedDataStreamer> waitCreatingNewStreams( ++ Set<StripedDataStreamer> healthyStreamers) throws IOException { ++ Set<StripedDataStreamer> failed = new HashSet<>(); ++ final int expectedNum = healthyStreamers.size(); ++ final long socketTimeout = dfsClient.getConf().getSocketTimeout(); ++ // the total wait time should be less than the socket timeout, otherwise ++ // a slow streamer may cause other streamers to timeout. here we wait for ++ // half of the socket timeout ++ long remaingTime = socketTimeout > 0 ? socketTimeout/2 : Long.MAX_VALUE; ++ final long waitInterval = 1000; ++ synchronized (coordinator) { ++ while (checkStreamerUpdates(failed, healthyStreamers) < expectedNum ++ && remaingTime > 0) { ++ try { ++ long start = Time.monotonicNow(); ++ coordinator.wait(waitInterval); ++ remaingTime -= Time.monotonicNow() - start; ++ } catch (InterruptedException e) { ++ throw DFSUtilClient.toInterruptedIOException("Interrupted when waiting" + ++ " for results of updating striped streamers", e); ++ } ++ } ++ } ++ synchronized (coordinator) { ++ for (StripedDataStreamer streamer : healthyStreamers) { ++ if (!coordinator.updateStreamerMap.containsKey(streamer)) { ++ // close the streamer if it is too slow to create new connection ++ streamer.setStreamerAsClosed(); ++ failed.add(streamer); ++ } ++ } ++ } ++ for (Map.Entry<StripedDataStreamer, Boolean> entry : ++ coordinator.updateStreamerMap.entrySet()) { ++ if (!entry.getValue()) { ++ failed.add(entry.getKey()); ++ } ++ } ++ for (StripedDataStreamer failedStreamer : failed) { ++ healthyStreamers.remove(failedStreamer); ++ } ++ return failed; ++ } ++ ++ /** ++ * Call {@link ClientProtocol#updateBlockForPipeline} and assign updated block ++ * to healthy streamers. ++ * @param healthyStreamers The healthy data streamers. These streamers join ++ * the failure handling. ++ */ ++ private ExtendedBlock updateBlockForPipeline( ++ Set<StripedDataStreamer> healthyStreamers) throws IOException { ++ final LocatedBlock updated = dfsClient.namenode.updateBlockForPipeline( ++ currentBlockGroup, dfsClient.clientName); ++ final long newGS = updated.getBlock().getGenerationStamp(); ++ ExtendedBlock newBlock = new ExtendedBlock(currentBlockGroup); ++ newBlock.setGenerationStamp(newGS); ++ final LocatedBlock[] updatedBlks = StripedBlockUtil.parseStripedBlockGroup( ++ (LocatedStripedBlock) updated, cellSize, numDataBlocks, ++ numAllBlocks - numDataBlocks); ++ ++ for (int i = 0; i < numAllBlocks; i++) { ++ StripedDataStreamer si = getStripedDataStreamer(i); ++ if (healthyStreamers.contains(si)) { ++ final LocatedBlock lb = new LocatedBlock(new ExtendedBlock(newBlock), ++ null, null, null, -1, updated.isCorrupt(), null); ++ lb.setBlockToken(updatedBlks[i].getBlockToken()); ++ coordinator.getNewBlocks().offer(i, lb); ++ } ++ } ++ return newBlock; ++ } ++ ++ private void updatePipeline(ExtendedBlock newBG) throws IOException { ++ final DatanodeInfo[] newNodes = new DatanodeInfo[numAllBlocks]; ++ final String[] newStorageIDs = new String[numAllBlocks]; ++ for (int i = 0; i < numAllBlocks; i++) { ++ final StripedDataStreamer streamer = getStripedDataStreamer(i); ++ final DatanodeInfo[] nodes = streamer.getNodes(); ++ final String[] storageIDs = streamer.getStorageIDs(); ++ if (streamer.isHealthy() && nodes != null && storageIDs != null) { ++ newNodes[i] = nodes[0]; ++ newStorageIDs[i] = storageIDs[0]; ++ } else { ++ newNodes[i] = new DatanodeInfo(DatanodeID.EMPTY_DATANODE_ID); ++ newStorageIDs[i] = ""; ++ } ++ } ++ dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup, ++ newBG, newNodes, newStorageIDs); ++ currentBlockGroup = newBG; ++ } ++ ++ private int stripeDataSize() { ++ return numDataBlocks * cellSize; ++ } ++ ++ @Override ++ public void hflush() { ++ throw new UnsupportedOperationException(); ++ } ++ ++ @Override ++ public void hsync() { ++ throw new UnsupportedOperationException(); ++ } ++ ++ @Override ++ protected synchronized void start() { ++ for (StripedDataStreamer streamer : streamers) { ++ streamer.start(); ++ } ++ } ++ ++ @Override ++ synchronized void abort() throws IOException { ++ if (isClosed()) { ++ return; ++ } ++ for (StripedDataStreamer streamer : streamers) { ++ streamer.getLastException().set(new IOException("Lease timeout of " ++ + (dfsClient.getConf().getHdfsTimeout()/1000) + ++ " seconds expired.")); ++ } ++ closeThreads(true); ++ dfsClient.endFileLease(fileId); ++ } ++ ++ @Override ++ boolean isClosed() { ++ if (closed) { ++ return true; ++ } ++ for(StripedDataStreamer s : streamers) { ++ if (!s.streamerClosed()) { ++ return false; ++ } ++ } ++ return true; ++ } ++ ++ @Override ++ protected void closeThreads(boolean force) throws IOException { ++ final MultipleIOException.Builder b = new MultipleIOException.Builder(); ++ try { ++ for (StripedDataStreamer streamer : streamers) { ++ try { ++ streamer.close(force); ++ streamer.join(); ++ streamer.closeSocket(); ++ } catch (Exception e) { ++ try { ++ handleStreamerFailure("force=" + force, e); ++ } catch (IOException ioe) { ++ b.add(ioe); ++ } ++ } finally { ++ streamer.setSocketToNull(); ++ } ++ } ++ } finally { ++ setClosed(); ++ } ++ final IOException ioe = b.build(); ++ if (ioe != null) { ++ throw ioe; ++ } ++ } ++ ++ private boolean generateParityCellsForLastStripe() { ++ final long currentBlockGroupBytes = currentBlockGroup == null ? ++ 0 : currentBlockGroup.getNumBytes(); ++ final long lastStripeSize = currentBlockGroupBytes % stripeDataSize(); ++ if (lastStripeSize == 0) { ++ return false; ++ } ++ ++ final long parityCellSize = lastStripeSize < cellSize? ++ lastStripeSize : cellSize; ++ final ByteBuffer[] buffers = cellBuffers.getBuffers(); ++ ++ for (int i = 0; i < numAllBlocks; i++) { ++ // Pad zero bytes to make all cells exactly the size of parityCellSize ++ // If internal block is smaller than parity block, pad zero bytes. ++ // Also pad zero bytes to all parity cells ++ final int position = buffers[i].position(); ++ assert position <= parityCellSize : "If an internal block is smaller" + ++ " than parity block, then its last cell should be small than last" + ++ " parity cell"; ++ for (int j = 0; j < parityCellSize - position; j++) { ++ buffers[i].put((byte) 0); ++ } ++ buffers[i].flip(); ++ } ++ return true; ++ } ++ ++ void writeParityCells() throws IOException { ++ final ByteBuffer[] buffers = cellBuffers.getBuffers(); ++ //encode the data cells ++ encode(encoder, numDataBlocks, buffers); ++ for (int i = numDataBlocks; i < numAllBlocks; i++) { ++ writeParity(i, buffers[i], cellBuffers.getChecksumArray(i)); ++ } ++ cellBuffers.clear(); ++ } ++ ++ void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf) ++ throws IOException { ++ final StripedDataStreamer current = setCurrentStreamer(index); ++ final int len = buffer.limit(); ++ ++ final long oldBytes = current.getBytesCurBlock(); ++ if (current.isHealthy()) { ++ try { ++ DataChecksum sum = getDataChecksum(); ++ sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0); ++ for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { ++ int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); ++ int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); ++ super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset, ++ getChecksumSize()); ++ } ++ } catch(Exception e) { ++ handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e); ++ } ++ } ++ } ++ ++ @Override ++ void setClosed() { ++ super.setClosed(); ++ for (int i = 0; i < numAllBlocks; i++) { ++ getStripedDataStreamer(i).release(); ++ } ++ cellBuffers.release(); ++ } ++ ++ @Override ++ protected synchronized void closeImpl() throws IOException { ++ if (isClosed()) { ++ final MultipleIOException.Builder b = new MultipleIOException.Builder(); ++ for(int i = 0; i < streamers.size(); i++) { ++ final StripedDataStreamer si = getStripedDataStreamer(i); ++ try { ++ si.getLastException().check(true); ++ } catch (IOException e) { ++ b.add(e); ++ } ++ } ++ final IOException ioe = b.build(); ++ if (ioe != null) { ++ throw ioe; ++ } ++ return; ++ } ++ ++ try { ++ // flush from all upper layers ++ flushBuffer(); ++ // if the last stripe is incomplete, generate and write parity cells ++ if (generateParityCellsForLastStripe()) { ++ writeParityCells(); ++ } ++ enqueueAllCurrentPackets(); ++ ++ // flush all the data packets ++ flushAllInternals(); ++ // check failures ++ checkStreamerFailures(); ++ ++ for (int i = 0; i < numAllBlocks; i++) { ++ final StripedDataStreamer s = setCurrentStreamer(i); ++ if (s.isHealthy()) { ++ try { ++ if (s.getBytesCurBlock() > 0) { ++ setCurrentPacketToEmpty(); ++ } ++ // flush the last "close" packet to Datanode ++ flushInternal(); ++ } catch(Exception e) { ++ // TODO for both close and endBlock, we currently do not handle ++ // failures when sending the last packet. We actually do not need to ++ // bump GS for this kind of failure. Thus counting the total number ++ // of failures may be good enough. ++ } ++ } ++ } ++ ++ closeThreads(false); ++ TraceScope scope = dfsClient.getTracer().newScope("completeFile"); ++ try { ++ completeFile(currentBlockGroup); ++ } finally { ++ scope.close(); ++ } ++ dfsClient.endFileLease(fileId); ++ } catch (ClosedChannelException ignored) { ++ } finally { ++ setClosed(); ++ } ++ } ++ ++ private void enqueueAllCurrentPackets() throws IOException { ++ int idx = streamers.indexOf(getCurrentStreamer()); ++ for(int i = 0; i < streamers.size(); i++) { ++ final StripedDataStreamer si = setCurrentStreamer(i); ++ if (si.isHealthy() && currentPacket != null) { ++ try { ++ enqueueCurrentPacket(); ++ } catch (IOException e) { ++ handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e); ++ } ++ } ++ } ++ setCurrentStreamer(idx); ++ } ++ ++ void flushAllInternals() throws IOException { ++ int current = getCurrentIndex(); ++ ++ for (int i = 0; i < numAllBlocks; i++) { ++ final StripedDataStreamer s = setCurrentStreamer(i); ++ if (s.isHealthy()) { ++ try { ++ // flush all data to Datanode ++ flushInternal(); ++ } catch(Exception e) { ++ handleStreamerFailure("flushInternal " + s, e); ++ } ++ } ++ } ++ setCurrentStreamer(current); ++ } ++ ++ static void sleep(long ms, String op) throws InterruptedIOException { ++ try { ++ Thread.sleep(ms); ++ } catch(InterruptedException ie) { ++ throw DFSUtilClient.toInterruptedIOException( ++ "Sleep interrupted during " + op, ie); ++ } ++ } ++ ++ @Override ++ ExtendedBlock getBlock() { ++ return currentBlockGroup; ++ } ++} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 359886e,e275afb..f96ae65 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@@ -53,6 -54,6 +54,7 @@@ import org.slf4j.LoggerFactory import javax.net.SocketFactory; import java.io.IOException; ++import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.InetSocketAddress; @@@ -628,4 -652,4 +653,11 @@@ public class DFSUtilClient return URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + namenode.getHostName() + portString); } ++ ++ public static InterruptedIOException toInterruptedIOException(String message, ++ InterruptedException e) { ++ final InterruptedIOException iioe = new InterruptedIOException(message); ++ iioe.initCause(e); ++ return iioe; ++ } }
