http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 0000000,0000000..264c532 new file mode 100644 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@@ -1,0 -1,0 +1,952 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.hadoop.hdfs.util; ++ ++import com.google.common.annotations.VisibleForTesting; ++ ++import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.fs.StorageType; ++import org.apache.hadoop.hdfs.DFSClient; ++import org.apache.hadoop.hdfs.DFSStripedOutputStream; ++import org.apache.hadoop.hdfs.protocol.Block; ++import org.apache.hadoop.hdfs.protocol.DatanodeInfo; ++import org.apache.hadoop.hdfs.protocol.ExtendedBlock; ++import org.apache.hadoop.hdfs.protocol.LocatedBlock; ++import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; ++ ++import com.google.common.base.Preconditions; ++import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; ++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; ++import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; ++import org.apache.hadoop.security.token.Token; ++ ++import java.nio.ByteBuffer; ++import java.util.*; ++import java.io.IOException; ++import java.util.concurrent.CancellationException; ++import java.util.concurrent.CompletionService; ++import java.util.concurrent.ExecutionException; ++import java.util.concurrent.Future; ++import java.util.concurrent.TimeUnit; ++ ++/** ++ * When accessing a file in striped layout, operations on logical byte ranges ++ * in the file need to be mapped to physical byte ranges on block files stored ++ * on DataNodes. This utility class facilities this mapping by defining and ++ * exposing a number of striping-related concepts. The most basic ones are ++ * illustrated in the following diagram. Unless otherwise specified, all ++ * range-related calculations are inclusive (the end offset of the previous ++ * range should be 1 byte lower than the start offset of the next one). ++ * ++ * | <---- Block Group ----> | <- Block Group: logical unit composing ++ * | | striped HDFS files. ++ * blk_0 blk_1 blk_2 <- Internal Blocks: each internal block ++ * | | | represents a physically stored local ++ * v v v block file ++ * +------+ +------+ +------+ ++ * |cell_0| |cell_1| |cell_2| <- {@link StripingCell} represents the ++ * +------+ +------+ +------+ logical order that a Block Group should ++ * |cell_3| |cell_4| |cell_5| be accessed: cell_0, cell_1, ... ++ * +------+ +------+ +------+ ++ * |cell_6| |cell_7| |cell_8| ++ * +------+ +------+ +------+ ++ * |cell_9| ++ * +------+ <- A cell contains cellSize bytes of data ++ */ ++@InterfaceAudience.Private ++public class StripedBlockUtil { ++ ++ /** ++ * This method parses a striped block group into individual blocks. ++ * ++ * @param bg The striped block group ++ * @param cellSize The size of a striping cell ++ * @param dataBlkNum The number of data blocks ++ * @return An array containing the blocks in the group ++ */ ++ public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg, ++ int cellSize, int dataBlkNum, int parityBlkNum) { ++ int locatedBGSize = bg.getBlockIndices().length; ++ LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum]; ++ for (short i = 0; i < locatedBGSize; i++) { ++ final int idx = bg.getBlockIndices()[i]; ++ // for now we do not use redundant replica of an internal block ++ if (idx < (dataBlkNum + parityBlkNum) && lbs[idx] == null) { ++ lbs[idx] = constructInternalBlock(bg, i, cellSize, ++ dataBlkNum, idx); ++ } ++ } ++ return lbs; ++ } ++ ++ /** ++ * This method creates an internal block at the given index of a block group ++ * ++ * @param idxInReturnedLocs The index in the stored locations in the ++ * {@link LocatedStripedBlock} object ++ * @param idxInBlockGroup The logical index in the striped block group ++ * @return The constructed internal block ++ */ ++ public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, ++ int idxInReturnedLocs, int cellSize, int dataBlkNum, ++ int idxInBlockGroup) { ++ final ExtendedBlock blk = constructInternalBlock( ++ bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup); ++ final LocatedBlock locatedBlock; ++ if (idxInReturnedLocs < bg.getLocations().length) { ++ locatedBlock = new LocatedBlock(blk, ++ new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, ++ new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, ++ new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, ++ bg.getStartOffset(), bg.isCorrupt(), null); ++ } else { ++ locatedBlock = new LocatedBlock(blk, null, null, null, ++ bg.getStartOffset(), bg.isCorrupt(), null); ++ } ++ Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens(); ++ if (idxInReturnedLocs < blockTokens.length) { ++ locatedBlock.setBlockToken(blockTokens[idxInReturnedLocs]); ++ } ++ return locatedBlock; ++ } ++ ++ /** ++ * This method creates an internal {@link ExtendedBlock} at the given index ++ * of a block group. ++ */ ++ public static ExtendedBlock constructInternalBlock(ExtendedBlock blockGroup, ++ int cellSize, int dataBlkNum, int idxInBlockGroup) { ++ ExtendedBlock block = new ExtendedBlock(blockGroup); ++ block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup); ++ block.setNumBytes(getInternalBlockLength(blockGroup.getNumBytes(), ++ cellSize, dataBlkNum, idxInBlockGroup)); ++ return block; ++ } ++ ++ /** ++ * Get the size of an internal block at the given index of a block group ++ * ++ * @param dataSize Size of the block group only counting data blocks ++ * @param cellSize The size of a striping cell ++ * @param numDataBlocks The number of data blocks ++ * @param i The logical index in the striped block group ++ * @return The size of the internal block at the specified index ++ */ ++ public static long getInternalBlockLength(long dataSize, ++ int cellSize, int numDataBlocks, int i) { ++ Preconditions.checkArgument(dataSize >= 0); ++ Preconditions.checkArgument(cellSize > 0); ++ Preconditions.checkArgument(numDataBlocks > 0); ++ Preconditions.checkArgument(i >= 0); ++ // Size of each stripe (only counting data blocks) ++ final int stripeSize = cellSize * numDataBlocks; ++ // If block group ends at stripe boundary, each internal block has an equal ++ // share of the group ++ final int lastStripeDataLen = (int)(dataSize % stripeSize); ++ if (lastStripeDataLen == 0) { ++ return dataSize / numDataBlocks; ++ } ++ ++ final int numStripes = (int) ((dataSize - 1) / stripeSize + 1); ++ return (numStripes - 1L)*cellSize ++ + lastCellSize(lastStripeDataLen, cellSize, numDataBlocks, i); ++ } ++ ++ private static int lastCellSize(int size, int cellSize, int numDataBlocks, ++ int i) { ++ if (i < numDataBlocks) { ++ // parity block size (i.e. i >= numDataBlocks) is the same as ++ // the first data block size (i.e. i = 0). ++ size -= i*cellSize; ++ if (size < 0) { ++ size = 0; ++ } ++ } ++ return size > cellSize? cellSize: size; ++ } ++ ++ /** ++ * Given a byte's offset in an internal block, calculate the offset in ++ * the block group ++ */ ++ public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum, ++ long offsetInBlk, int idxInBlockGroup) { ++ int cellIdxInBlk = (int) (offsetInBlk / cellSize); ++ return cellIdxInBlk * cellSize * dataBlkNum // n full stripes before offset ++ + idxInBlockGroup * cellSize // m full cells before offset ++ + offsetInBlk % cellSize; // partial cell ++ } ++ ++ /** ++ * Get the next completed striped read task ++ * ++ * @return {@link StripingChunkReadResult} indicating the status of the read task ++ * succeeded, and the block index of the task. If the method times ++ * out without getting any completed read tasks, -1 is returned as ++ * block index. ++ * @throws InterruptedException ++ */ ++ public static StripingChunkReadResult getNextCompletedStripedRead( ++ CompletionService<Void> readService, Map<Future<Void>, Integer> futures, ++ final long timeoutMillis) throws InterruptedException { ++ Preconditions.checkArgument(!futures.isEmpty()); ++ Future<Void> future = null; ++ try { ++ if (timeoutMillis > 0) { ++ future = readService.poll(timeoutMillis, TimeUnit.MILLISECONDS); ++ } else { ++ future = readService.take(); ++ } ++ if (future != null) { ++ future.get(); ++ return new StripingChunkReadResult(futures.remove(future), ++ StripingChunkReadResult.SUCCESSFUL); ++ } else { ++ return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT); ++ } ++ } catch (ExecutionException e) { ++ if (DFSClient.LOG.isDebugEnabled()) { ++ DFSClient.LOG.debug("ExecutionException " + e); ++ } ++ return new StripingChunkReadResult(futures.remove(future), ++ StripingChunkReadResult.FAILED); ++ } catch (CancellationException e) { ++ return new StripingChunkReadResult(futures.remove(future), ++ StripingChunkReadResult.CANCELLED); ++ } ++ } ++ ++ /** ++ * Get the total usage of the striped blocks, which is the total of data ++ * blocks and parity blocks ++ * ++ * @param numDataBlkBytes ++ * Size of the block group only counting data blocks ++ * @param dataBlkNum ++ * The number of data blocks ++ * @param parityBlkNum ++ * The number of parity blocks ++ * @param cellSize ++ * The size of a striping cell ++ * @return The total usage of data blocks and parity blocks ++ */ ++ public static long spaceConsumedByStripedBlock(long numDataBlkBytes, ++ int dataBlkNum, int parityBlkNum, int cellSize) { ++ int parityIndex = dataBlkNum + 1; ++ long numParityBlkBytes = getInternalBlockLength(numDataBlkBytes, cellSize, ++ dataBlkNum, parityIndex) * parityBlkNum; ++ return numDataBlkBytes + numParityBlkBytes; ++ } ++ ++ /** ++ * Initialize the decoding input buffers based on the chunk states in an ++ * {@link AlignedStripe}. For each chunk that was not initially requested, ++ * schedule a new fetch request with the decoding input buffer as transfer ++ * destination. ++ */ ++ public static byte[][] initDecodeInputs(AlignedStripe alignedStripe, ++ int dataBlkNum, int parityBlkNum) { ++ byte[][] decodeInputs = ++ new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()]; ++ // read the full data aligned stripe ++ for (int i = 0; i < dataBlkNum; i++) { ++ if (alignedStripe.chunks[i] == null) { ++ final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); ++ alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]); ++ alignedStripe.chunks[i].addByteArraySlice(0, ++ (int) alignedStripe.getSpanInBlock()); ++ } ++ } ++ return decodeInputs; ++ } ++ ++ /** ++ * Some fetched {@link StripingChunk} might be stored in original application ++ * buffer instead of prepared decode input buffers. Some others are beyond ++ * the range of the internal blocks and should correspond to all zero bytes. ++ * When all pending requests have returned, this method should be called to ++ * finalize decode input buffers. ++ */ ++ public static void finalizeDecodeInputs(final byte[][] decodeInputs, ++ int dataBlkNum, int parityBlkNum, AlignedStripe alignedStripe) { ++ for (int i = 0; i < alignedStripe.chunks.length; i++) { ++ final StripingChunk chunk = alignedStripe.chunks[i]; ++ final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); ++ if (chunk != null && chunk.state == StripingChunk.FETCHED) { ++ chunk.copyTo(decodeInputs[decodeIndex]); ++ } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) { ++ Arrays.fill(decodeInputs[decodeIndex], (byte) 0); ++ } else { ++ decodeInputs[decodeIndex] = null; ++ } ++ } ++ } ++ ++ /** ++ * Currently decoding requires parity chunks are before data chunks. ++ * The indices are opposite to what we store in NN. In future we may ++ * improve the decoding to make the indices order the same as in NN. ++ * ++ * @param index The index to convert ++ * @param dataBlkNum The number of data blocks ++ * @param parityBlkNum The number of parity blocks ++ * @return converted index ++ */ ++ public static int convertIndex4Decode(int index, int dataBlkNum, ++ int parityBlkNum) { ++ return index < dataBlkNum ? index + parityBlkNum : index - dataBlkNum; ++ } ++ ++ public static int convertDecodeIndexBack(int index, int dataBlkNum, ++ int parityBlkNum) { ++ return index < parityBlkNum ? index + dataBlkNum : index - parityBlkNum; ++ } ++ ++ /** ++ * Decode based on the given input buffers and erasure coding policy. ++ */ ++ public static void decodeAndFillBuffer(final byte[][] decodeInputs, ++ AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum, ++ RawErasureDecoder decoder) { ++ // Step 1: prepare indices and output buffers for missing data units ++ int[] decodeIndices = new int[parityBlkNum]; ++ int pos = 0; ++ for (int i = 0; i < dataBlkNum; i++) { ++ if (alignedStripe.chunks[i] != null && ++ alignedStripe.chunks[i].state == StripingChunk.MISSING){ ++ decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum); ++ } ++ } ++ decodeIndices = Arrays.copyOf(decodeIndices, pos); ++ byte[][] decodeOutputs = ++ new byte[decodeIndices.length][(int) alignedStripe.getSpanInBlock()]; ++ ++ // Step 2: decode into prepared output buffers ++ decoder.decode(decodeInputs, decodeIndices, decodeOutputs); ++ ++ // Step 3: fill original application buffer with decoded data ++ for (int i = 0; i < decodeIndices.length; i++) { ++ int missingBlkIdx = convertDecodeIndexBack(decodeIndices[i], ++ dataBlkNum, parityBlkNum); ++ StripingChunk chunk = alignedStripe.chunks[missingBlkIdx]; ++ if (chunk.state == StripingChunk.MISSING) { ++ chunk.copyFrom(decodeOutputs[i]); ++ } ++ } ++ } ++ ++ /** ++ * Similar functionality with {@link #divideByteRangeIntoStripes}, but is used ++ * by stateful read and uses ByteBuffer as reading target buffer. Besides the ++ * read range is within a single stripe thus the calculation logic is simpler. ++ */ ++ public static AlignedStripe[] divideOneStripe(ErasureCodingPolicy ecPolicy, ++ int cellSize, LocatedStripedBlock blockGroup, long rangeStartInBlockGroup, ++ long rangeEndInBlockGroup, ByteBuffer buf) { ++ final int dataBlkNum = ecPolicy.getNumDataUnits(); ++ // Step 1: map the byte range to StripingCells ++ StripingCell[] cells = getStripingCellsOfByteRange(ecPolicy, cellSize, ++ blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup); ++ ++ // Step 2: get the unmerged ranges on each internal block ++ VerticalRange[] ranges = getRangesForInternalBlocks(ecPolicy, cellSize, ++ cells); ++ ++ // Step 3: merge into stripes ++ AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges); ++ ++ // Step 4: calculate each chunk's position in destination buffer. Since the ++ // whole read range is within a single stripe, the logic is simpler here. ++ int bufOffset = (int) (rangeStartInBlockGroup % ((long) cellSize * dataBlkNum)); ++ for (StripingCell cell : cells) { ++ long cellStart = cell.idxInInternalBlk * cellSize + cell.offset; ++ long cellEnd = cellStart + cell.size - 1; ++ for (AlignedStripe s : stripes) { ++ long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1; ++ long overlapStart = Math.max(cellStart, s.getOffsetInBlock()); ++ long overlapEnd = Math.min(cellEnd, stripeEnd); ++ int overLapLen = (int) (overlapEnd - overlapStart + 1); ++ if (overLapLen > 0) { ++ Preconditions.checkState(s.chunks[cell.idxInStripe] == null); ++ final int pos = (int) (bufOffset + overlapStart - cellStart); ++ buf.position(pos); ++ buf.limit(pos + overLapLen); ++ s.chunks[cell.idxInStripe] = new StripingChunk(buf.slice()); ++ } ++ } ++ bufOffset += cell.size; ++ } ++ ++ // Step 5: prepare ALLZERO blocks ++ prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum); ++ return stripes; ++ } ++ ++ /** ++ * This method divides a requested byte range into an array of inclusive ++ * {@link AlignedStripe}. ++ * @param ecPolicy The codec policy for the file, which carries the numbers ++ * of data / parity blocks ++ * @param cellSize Cell size of stripe ++ * @param blockGroup The striped block group ++ * @param rangeStartInBlockGroup The byte range's start offset in block group ++ * @param rangeEndInBlockGroup The byte range's end offset in block group ++ * @param buf Destination buffer of the read operation for the byte range ++ * @param offsetInBuf Start offset into the destination buffer ++ * ++ * At most 5 stripes will be generated from each logical range, as ++ * demonstrated in the header of {@link AlignedStripe}. ++ */ ++ public static AlignedStripe[] divideByteRangeIntoStripes(ErasureCodingPolicy ecPolicy, ++ int cellSize, LocatedStripedBlock blockGroup, ++ long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf, ++ int offsetInBuf) { ++ ++ // Step 0: analyze range and calculate basic parameters ++ final int dataBlkNum = ecPolicy.getNumDataUnits(); ++ ++ // Step 1: map the byte range to StripingCells ++ StripingCell[] cells = getStripingCellsOfByteRange(ecPolicy, cellSize, ++ blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup); ++ ++ // Step 2: get the unmerged ranges on each internal block ++ VerticalRange[] ranges = getRangesForInternalBlocks(ecPolicy, cellSize, ++ cells); ++ ++ // Step 3: merge into at most 5 stripes ++ AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges); ++ ++ // Step 4: calculate each chunk's position in destination buffer ++ calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf); ++ ++ // Step 5: prepare ALLZERO blocks ++ prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum); ++ ++ return stripes; ++ } ++ ++ /** ++ * Map the logical byte range to a set of inclusive {@link StripingCell} ++ * instances, each representing the overlap of the byte range to a cell ++ * used by {@link DFSStripedOutputStream} in encoding ++ */ ++ @VisibleForTesting ++ private static StripingCell[] getStripingCellsOfByteRange(ErasureCodingPolicy ecPolicy, ++ int cellSize, LocatedStripedBlock blockGroup, ++ long rangeStartInBlockGroup, long rangeEndInBlockGroup) { ++ Preconditions.checkArgument( ++ rangeStartInBlockGroup <= rangeEndInBlockGroup && ++ rangeEndInBlockGroup < blockGroup.getBlockSize()); ++ long len = rangeEndInBlockGroup - rangeStartInBlockGroup + 1; ++ int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize); ++ int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize); ++ int numCells = lastCellIdxInBG - firstCellIdxInBG + 1; ++ StripingCell[] cells = new StripingCell[numCells]; ++ ++ final int firstCellOffset = (int) (rangeStartInBlockGroup % cellSize); ++ final int firstCellSize = ++ (int) Math.min(cellSize - (rangeStartInBlockGroup % cellSize), len); ++ cells[0] = new StripingCell(ecPolicy, firstCellSize, firstCellIdxInBG, ++ firstCellOffset); ++ if (lastCellIdxInBG != firstCellIdxInBG) { ++ final int lastCellSize = (int) (rangeEndInBlockGroup % cellSize) + 1; ++ cells[numCells - 1] = new StripingCell(ecPolicy, lastCellSize, ++ lastCellIdxInBG, 0); ++ } ++ ++ for (int i = 1; i < numCells - 1; i++) { ++ cells[i] = new StripingCell(ecPolicy, cellSize, i + firstCellIdxInBG, 0); ++ } ++ ++ return cells; ++ } ++ ++ /** ++ * Given a logical byte range, mapped to each {@link StripingCell}, calculate ++ * the physical byte range (inclusive) on each stored internal block. ++ */ ++ @VisibleForTesting ++ private static VerticalRange[] getRangesForInternalBlocks(ErasureCodingPolicy ecPolicy, ++ int cellSize, StripingCell[] cells) { ++ int dataBlkNum = ecPolicy.getNumDataUnits(); ++ int parityBlkNum = ecPolicy.getNumParityUnits(); ++ ++ VerticalRange ranges[] = new VerticalRange[dataBlkNum + parityBlkNum]; ++ ++ long earliestStart = Long.MAX_VALUE; ++ long latestEnd = -1; ++ for (StripingCell cell : cells) { ++ // iterate through all cells and update the list of StripeRanges ++ if (ranges[cell.idxInStripe] == null) { ++ ranges[cell.idxInStripe] = new VerticalRange( ++ cell.idxInInternalBlk * cellSize + cell.offset, cell.size); ++ } else { ++ ranges[cell.idxInStripe].spanInBlock += cell.size; ++ } ++ VerticalRange range = ranges[cell.idxInStripe]; ++ if (range.offsetInBlock < earliestStart) { ++ earliestStart = range.offsetInBlock; ++ } ++ if (range.offsetInBlock + range.spanInBlock - 1 > latestEnd) { ++ latestEnd = range.offsetInBlock + range.spanInBlock - 1; ++ } ++ } ++ ++ // Each parity block should be fetched at maximum range of all data blocks ++ for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) { ++ ranges[i] = new VerticalRange(earliestStart, ++ latestEnd - earliestStart + 1); ++ } ++ ++ return ranges; ++ } ++ ++ /** ++ * Merge byte ranges on each internal block into a set of inclusive ++ * {@link AlignedStripe} instances. ++ */ ++ private static AlignedStripe[] mergeRangesForInternalBlocks( ++ ErasureCodingPolicy ecPolicy, VerticalRange[] ranges) { ++ int dataBlkNum = ecPolicy.getNumDataUnits(); ++ int parityBlkNum = ecPolicy.getNumParityUnits(); ++ List<AlignedStripe> stripes = new ArrayList<>(); ++ SortedSet<Long> stripePoints = new TreeSet<>(); ++ for (VerticalRange r : ranges) { ++ if (r != null) { ++ stripePoints.add(r.offsetInBlock); ++ stripePoints.add(r.offsetInBlock + r.spanInBlock); ++ } ++ } ++ ++ long prev = -1; ++ for (long point : stripePoints) { ++ if (prev >= 0) { ++ stripes.add(new AlignedStripe(prev, point - prev, ++ dataBlkNum + parityBlkNum)); ++ } ++ prev = point; ++ } ++ return stripes.toArray(new AlignedStripe[stripes.size()]); ++ } ++ ++ private static void calcualteChunkPositionsInBuf(int cellSize, ++ AlignedStripe[] stripes, StripingCell[] cells, byte[] buf, ++ int offsetInBuf) { ++ /** ++ * | <--------------- AlignedStripe --------------->| ++ * ++ * |<- length_0 ->|<-- length_1 -->|<- length_2 ->| ++ * +------------------+------------------+----------------+ ++ * | cell_0_0_0 | cell_3_1_0 | cell_6_2_0 | <- blk_0 ++ * +------------------+------------------+----------------+ ++ * _/ \_______________________ ++ * | | ++ * v offset_0 v offset_1 ++ * +----------------------------------------------------------+ ++ * | cell_0_0_0 | cell_1_0_1 and cell_2_0_2 |cell_3_1_0 ...| <- buf ++ * | (partial) | (from blk_1 and blk_2) | | ++ * +----------------------------------------------------------+ ++ * ++ * Cell indexing convention defined in {@link StripingCell} ++ */ ++ int done = 0; ++ for (StripingCell cell : cells) { ++ long cellStart = cell.idxInInternalBlk * cellSize + cell.offset; ++ long cellEnd = cellStart + cell.size - 1; ++ for (AlignedStripe s : stripes) { ++ long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1; ++ long overlapStart = Math.max(cellStart, s.getOffsetInBlock()); ++ long overlapEnd = Math.min(cellEnd, stripeEnd); ++ int overLapLen = (int) (overlapEnd - overlapStart + 1); ++ if (overLapLen <= 0) { ++ continue; ++ } ++ if (s.chunks[cell.idxInStripe] == null) { ++ s.chunks[cell.idxInStripe] = new StripingChunk(buf); ++ } ++ s.chunks[cell.idxInStripe].addByteArraySlice( ++ (int)(offsetInBuf + done + overlapStart - cellStart), overLapLen); ++ } ++ done += cell.size; ++ } ++ } ++ ++ /** ++ * If a {@link StripingChunk} maps to a byte range beyond an internal block's ++ * size, the chunk should be treated as zero bytes in decoding. ++ */ ++ private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup, ++ AlignedStripe[] stripes, int cellSize, int dataBlkNum) { ++ for (AlignedStripe s : stripes) { ++ for (int i = 0; i < dataBlkNum; i++) { ++ long internalBlkLen = getInternalBlockLength(blockGroup.getBlockSize(), ++ cellSize, dataBlkNum, i); ++ if (internalBlkLen <= s.getOffsetInBlock()) { ++ Preconditions.checkState(s.chunks[i] == null); ++ s.chunks[i] = new StripingChunk(StripingChunk.ALLZERO); ++ } ++ } ++ } ++ } ++ ++ /** ++ * Cell is the unit of encoding used in {@link DFSStripedOutputStream}. This ++ * size impacts how a logical offset in the file or block group translates ++ * to physical byte offset in a stored internal block. The StripingCell util ++ * class facilitates this calculation. Each StripingCell is inclusive with ++ * its start and end offsets -- e.g., the end logical offset of cell_0_0_0 ++ * should be 1 byte lower than the start logical offset of cell_1_0_1. ++ * ++ * | <------- Striped Block Group -------> | ++ * blk_0 blk_1 blk_2 ++ * | | | ++ * v v v ++ * +----------+ +----------+ +----------+ ++ * |cell_0_0_0| |cell_1_0_1| |cell_2_0_2| ++ * +----------+ +----------+ +----------+ ++ * |cell_3_1_0| |cell_4_1_1| |cell_5_1_2| <- {@link #idxInBlkGroup} = 5 ++ * +----------+ +----------+ +----------+ {@link #idxInInternalBlk} = 1 ++ * {@link #idxInStripe} = 2 ++ * A StripingCell is a special instance of {@link StripingChunk} whose offset ++ * and size align with the cell used when writing data. ++ * TODO: consider parity cells ++ */ ++ @VisibleForTesting ++ static class StripingCell { ++ final ErasureCodingPolicy ecPolicy; ++ /** Logical order in a block group, used when doing I/O to a block group */ ++ final int idxInBlkGroup; ++ final int idxInInternalBlk; ++ final int idxInStripe; ++ /** ++ * When a logical byte range is mapped to a set of cells, it might ++ * partially overlap with the first and last cells. This field and the ++ * {@link #size} variable represent the start offset and size of the ++ * overlap. ++ */ ++ final int offset; ++ final int size; ++ ++ StripingCell(ErasureCodingPolicy ecPolicy, int cellSize, int idxInBlkGroup, ++ int offset) { ++ this.ecPolicy = ecPolicy; ++ this.idxInBlkGroup = idxInBlkGroup; ++ this.idxInInternalBlk = idxInBlkGroup / ecPolicy.getNumDataUnits(); ++ this.idxInStripe = idxInBlkGroup - ++ this.idxInInternalBlk * ecPolicy.getNumDataUnits(); ++ this.offset = offset; ++ this.size = cellSize; ++ } ++ } ++ ++ /** ++ * Given a requested byte range on a striped block group, an AlignedStripe ++ * represents an inclusive {@link VerticalRange} that is aligned with both ++ * the byte range and boundaries of all internal blocks. As illustrated in ++ * the diagram, any given byte range on a block group leads to 1~5 ++ * AlignedStripe's. ++ * ++ * |<-------- Striped Block Group -------->| ++ * blk_0 blk_1 blk_2 blk_3 blk_4 ++ * +----+ | +----+ +----+ ++ * |full| | | | | | <- AlignedStripe0: ++ * +----+ |~~~~| | |~~~~| |~~~~| 1st cell is partial ++ * |part| | | | | | | | <- AlignedStripe1: byte range ++ * +----+ +----+ +----+ | |~~~~| |~~~~| doesn't start at 1st block ++ * |full| |full| |full| | | | | | ++ * |cell| |cell| |cell| | | | | | <- AlignedStripe2 (full stripe) ++ * | | | | | | | | | | | ++ * +----+ +----+ +----+ | |~~~~| |~~~~| ++ * |full| |part| | | | | | <- AlignedStripe3: byte range ++ * |~~~~| +----+ | |~~~~| |~~~~| doesn't end at last block ++ * | | | | | | | <- AlignedStripe4: ++ * +----+ | +----+ +----+ last cell is partial ++ * | ++ * <---- data blocks ----> | <--- parity ---> ++ * ++ * An AlignedStripe is the basic unit of reading from a striped block group, ++ * because within the AlignedStripe, all internal blocks can be processed in ++ * a uniform manner. ++ * ++ * The coverage of an AlignedStripe on an internal block is represented as a ++ * {@link StripingChunk}. ++ * ++ * To simplify the logic of reading a logical byte range from a block group, ++ * a StripingChunk is either completely in the requested byte range or ++ * completely outside the requested byte range. ++ */ ++ public static class AlignedStripe { ++ public VerticalRange range; ++ /** status of each chunk in the stripe */ ++ public final StripingChunk[] chunks; ++ public int fetchedChunksNum = 0; ++ public int missingChunksNum = 0; ++ ++ public AlignedStripe(long offsetInBlock, long length, int width) { ++ Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0); ++ this.range = new VerticalRange(offsetInBlock, length); ++ this.chunks = new StripingChunk[width]; ++ } ++ ++ public boolean include(long pos) { ++ return range.include(pos); ++ } ++ ++ public long getOffsetInBlock() { ++ return range.offsetInBlock; ++ } ++ ++ public long getSpanInBlock() { ++ return range.spanInBlock; ++ } ++ ++ @Override ++ public String toString() { ++ return "Offset=" + range.offsetInBlock + ", length=" + range.spanInBlock + ++ ", fetchedChunksNum=" + fetchedChunksNum + ++ ", missingChunksNum=" + missingChunksNum; ++ } ++ } ++ ++ /** ++ * A simple utility class representing an arbitrary vertical inclusive range ++ * starting at {@link #offsetInBlock} and lasting for {@link #spanInBlock} ++ * bytes in an internal block. Note that VerticalRange doesn't necessarily ++ * align with {@link StripingCell}. ++ * ++ * |<- Striped Block Group ->| ++ * blk_0 ++ * | ++ * v ++ * +-----+ ++ * |~~~~~| <-- {@link #offsetInBlock} ++ * | | ^ ++ * | | | ++ * | | | {@link #spanInBlock} ++ * | | v ++ * |~~~~~| --- ++ * | | ++ * +-----+ ++ */ ++ public static class VerticalRange { ++ /** start offset in the block group (inclusive) */ ++ public long offsetInBlock; ++ /** length of the stripe range */ ++ public long spanInBlock; ++ ++ public VerticalRange(long offsetInBlock, long length) { ++ Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0); ++ this.offsetInBlock = offsetInBlock; ++ this.spanInBlock = length; ++ } ++ ++ /** whether a position is in the range */ ++ public boolean include(long pos) { ++ return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock; ++ } ++ } ++ ++ /** ++ * Indicates the coverage of an {@link AlignedStripe} on an internal block, ++ * and the state of the chunk in the context of the read request. ++ * ++ * |<---------------- Striped Block Group --------------->| ++ * blk_0 blk_1 blk_2 blk_3 blk_4 ++ * +---------+ | +----+ +----+ ++ * null null |REQUESTED| | |null| |null| <- AlignedStripe0 ++ * +---------+ |---------| | |----| |----| ++ * null |REQUESTED| |REQUESTED| | |null| |null| <- AlignedStripe1 ++ * +---------+ +---------+ +---------+ | +----+ +----+ ++ * |REQUESTED| |REQUESTED| ALLZERO | |null| |null| <- AlignedStripe2 ++ * +---------+ +---------+ | +----+ +----+ ++ * <----------- data blocks ------------> | <--- parity ---> ++ */ ++ public static class StripingChunk { ++ /** Chunk has been successfully fetched */ ++ public static final int FETCHED = 0x01; ++ /** Chunk has encountered failed when being fetched */ ++ public static final int MISSING = 0x02; ++ /** Chunk being fetched (fetching task is in-flight) */ ++ public static final int PENDING = 0x04; ++ /** ++ * Chunk is requested either by application or for decoding, need to ++ * schedule read task ++ */ ++ public static final int REQUESTED = 0X08; ++ /** ++ * Internal block is short and has no overlap with chunk. Chunk considered ++ * all-zero bytes in codec calculations. ++ */ ++ public static final int ALLZERO = 0X0f; ++ ++ /** ++ * If a chunk is completely in requested range, the state transition is: ++ * REQUESTED (when AlignedStripe created) -> PENDING -> {FETCHED | MISSING} ++ * If a chunk is completely outside requested range (including parity ++ * chunks), state transition is: ++ * null (AlignedStripe created) -> REQUESTED (upon failure) -> PENDING ... ++ */ ++ public int state = REQUESTED; ++ ++ public final ChunkByteArray byteArray; ++ public final ByteBuffer byteBuffer; ++ ++ public StripingChunk(byte[] buf) { ++ this.byteArray = new ChunkByteArray(buf); ++ byteBuffer = null; ++ } ++ ++ public StripingChunk(ByteBuffer buf) { ++ this.byteArray = null; ++ this.byteBuffer = buf; ++ } ++ ++ public StripingChunk(int state) { ++ this.byteArray = null; ++ this.byteBuffer = null; ++ this.state = state; ++ } ++ ++ public void addByteArraySlice(int offset, int length) { ++ assert byteArray != null; ++ byteArray.offsetsInBuf.add(offset); ++ byteArray.lengthsInBuf.add(length); ++ } ++ ++ void copyTo(byte[] target) { ++ assert byteArray != null; ++ byteArray.copyTo(target); ++ } ++ ++ void copyFrom(byte[] src) { ++ assert byteArray != null; ++ byteArray.copyFrom(src); ++ } ++ } ++ ++ public static class ChunkByteArray { ++ private final byte[] buf; ++ private final List<Integer> offsetsInBuf; ++ private final List<Integer> lengthsInBuf; ++ ++ ChunkByteArray(byte[] buf) { ++ this.buf = buf; ++ this.offsetsInBuf = new ArrayList<>(); ++ this.lengthsInBuf = new ArrayList<>(); ++ } ++ ++ public int[] getOffsets() { ++ int[] offsets = new int[offsetsInBuf.size()]; ++ for (int i = 0; i < offsets.length; i++) { ++ offsets[i] = offsetsInBuf.get(i); ++ } ++ return offsets; ++ } ++ ++ public int[] getLengths() { ++ int[] lens = new int[this.lengthsInBuf.size()]; ++ for (int i = 0; i < lens.length; i++) { ++ lens[i] = this.lengthsInBuf.get(i); ++ } ++ return lens; ++ } ++ ++ public byte[] buf() { ++ return buf; ++ } ++ ++ void copyTo(byte[] target) { ++ int posInBuf = 0; ++ for (int i = 0; i < offsetsInBuf.size(); i++) { ++ System.arraycopy(buf, offsetsInBuf.get(i), ++ target, posInBuf, lengthsInBuf.get(i)); ++ posInBuf += lengthsInBuf.get(i); ++ } ++ } ++ ++ void copyFrom(byte[] src) { ++ int srcPos = 0; ++ for (int j = 0; j < offsetsInBuf.size(); j++) { ++ System.arraycopy(src, srcPos, buf, offsetsInBuf.get(j), ++ lengthsInBuf.get(j)); ++ srcPos += lengthsInBuf.get(j); ++ } ++ } ++ } ++ ++ /** ++ * This class represents result from a striped read request. ++ * If the task was successful or the internal computation failed, ++ * an index is also returned. ++ */ ++ public static class StripingChunkReadResult { ++ public static final int SUCCESSFUL = 0x01; ++ public static final int FAILED = 0x02; ++ public static final int TIMEOUT = 0x04; ++ public static final int CANCELLED = 0x08; ++ ++ public final int index; ++ public final int state; ++ ++ public StripingChunkReadResult(int state) { ++ Preconditions.checkArgument(state == TIMEOUT, ++ "Only timeout result should return negative index."); ++ this.index = -1; ++ this.state = state; ++ } ++ ++ public StripingChunkReadResult(int index, int state) { ++ Preconditions.checkArgument(state != TIMEOUT, ++ "Timeout result should return negative index."); ++ this.index = index; ++ this.state = state; ++ } ++ ++ @Override ++ public String toString() { ++ return "(index=" + index + ", state =" + state + ")"; ++ } ++ } ++ ++ /** ++ * Check if the information such as IDs and generation stamps in block-i ++ * match the block group. ++ */ ++ public static void checkBlocks(ExtendedBlock blockGroup, ++ int i, ExtendedBlock blocki) throws IOException { ++ if (!blocki.getBlockPoolId().equals(blockGroup.getBlockPoolId())) { ++ throw new IOException("Block pool IDs mismatched: block" + i + "=" ++ + blocki + ", expected block group=" + blockGroup); ++ } ++ if (blocki.getBlockId() - i != blockGroup.getBlockId()) { ++ throw new IOException("Block IDs mismatched: block" + i + "=" ++ + blocki + ", expected block group=" + blockGroup); ++ } ++ if (blocki.getGenerationStamp() != blockGroup.getGenerationStamp()) { ++ throw new IOException("Generation stamps mismatched: block" + i + "=" ++ + blocki + ", expected block group=" + blockGroup); ++ } ++ } ++ ++ public static int getBlockIndex(Block reportedBlock) { ++ long BLOCK_GROUP_INDEX_MASK = 15; ++ return (int) (reportedBlock.getBlockId() & ++ BLOCK_GROUP_INDEX_MASK); ++ } ++}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index b28ab42,0e2d541..d35fb57 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@@ -648,3 -444,3 +478,11 @@@ message RollingUpgradeStatusProto required string blockPoolId = 1; optional bool finalized = 2 [default = false]; } ++ ++ ++/** ++ * A list of storage IDs. ++ */ ++message StorageUuidsProto { ++ repeated string storageUuids = 1; ++} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8874c4d,b631955..0166029 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@@ -402,14 -397,11 +400,19 @@@ public class DFSConfigKeys extends Comm public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600; public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads"; public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1; + public static final String DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads"; + public static final int DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20; + public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size"; + public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024; + public static final String DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY = "dfs.datanode.stripedread.timeout.millis"; + public static final int DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT = 5000; //5s + public static final String DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY = "dfs.datanode.striped.blockrecovery.threads.size"; + public static final int DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT = 8; + public static final String + DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY = + "dfs.datanode.directoryscan.throttle.limit.ms.per.sec"; + public static final int + DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = 1000; public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface"; public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default"; public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 7f721f0,5b11ac2..b0ea7ce --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@@ -1442,34 -1439,5 +1441,4 @@@ public class DFSUtil .createKeyProviderCryptoExtension(keyProvider); return cryptoProvider; } - - public static int getIoFileBufferSize(Configuration conf) { - return conf.getInt( - CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, - CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); - } - - public static int getSmallBufferSize(Configuration conf) { - return Math.min(getIoFileBufferSize(conf) / 2, 512); - } - - /** - * Probe for HDFS Encryption being enabled; this uses the value of - * the option {@link DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI}, - * returning true if that property contains a non-empty, non-whitespace - * string. - * @param conf configuration to probe - * @return true if encryption is considered enabled. - */ - public static boolean isHDFSEncryptionEnabled(Configuration conf) { - return !conf.getTrimmed( - DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty(); - } -- - public static InterruptedIOException toInterruptedIOException(String message, - InterruptedException e) { - final InterruptedIOException iioe = new InterruptedIOException(message); - iioe.initCause(e); - return iioe; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 524248c,75b3811..05c498f --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@@ -47,36 -45,27 +47,35 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto; ++import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RecoveringBlockProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto; - import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlockKeyProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlockWithLocationsProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlocksWithLocationsProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.CheckpointCommandProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.CheckpointSignatureProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.ExportedBlockKeysProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto.NamenodeRoleProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamespaceInfoProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RecoveringBlockProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogManifestProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.ReplicaStateProto; + import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; import org.apache.hadoop.hdfs.security.token.block.BlockKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 9228bec,2646089..43ddf74 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@@ -364,10 -368,9 +370,9 @@@ public class DataNode extends Reconfigu private String supergroup; private boolean isPermissionEnabled; private String dnUserName = null; - - private SpanReceiverHost spanReceiverHost; -- + private ErasureCodingWorker ecWorker; + final Tracer tracer; + private final TracerConfigurationManager tracerConfigurationManager; private static final int NUM_CORES = Runtime.getRuntime() .availableProcessors(); private static final double CONGESTION_RATIO = 1.5; @@@ -3289,12 -3287,8 +3320,12 @@@ @Override public void removeSpanReceiver(long id) throws IOException { checkSuperuserPrivilege(); - spanReceiverHost.removeSpanReceiver(id); + tracerConfigurationManager.removeSpanReceiver(id); } + + public ErasureCodingWorker getErasureCodingWorker(){ + return ecWorker; + } /** * Get timeout value of each OOB type from configuration