http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java new file mode 100644 index 0000000..4dc94a0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -0,0 +1,947 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.util; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSStripedOutputStream; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.security.token.Token; + +import java.nio.ByteBuffer; +import java.util.*; +import java.io.IOException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * When accessing a file in striped layout, operations on logical byte ranges + * in the file need to be mapped to physical byte ranges on block files stored + * on DataNodes. This utility class facilities this mapping by defining and + * exposing a number of striping-related concepts. The most basic ones are + * illustrated in the following diagram. Unless otherwise specified, all + * range-related calculations are inclusive (the end offset of the previous + * range should be 1 byte lower than the start offset of the next one). + * + * | <---- Block Group ----> | <- Block Group: logical unit composing + * | | striped HDFS files. + * blk_0 blk_1 blk_2 <- Internal Blocks: each internal block + * | | | represents a physically stored local + * v v v block file + * +------+ +------+ +------+ + * |cell_0| |cell_1| |cell_2| <- {@link StripingCell} represents the + * +------+ +------+ +------+ logical order that a Block Group should + * |cell_3| |cell_4| |cell_5| be accessed: cell_0, cell_1, ... + * +------+ +------+ +------+ + * |cell_6| |cell_7| |cell_8| + * +------+ +------+ +------+ + * |cell_9| + * +------+ <- A cell contains cellSize bytes of data + */ [email protected] +public class StripedBlockUtil { + + /** + * This method parses a striped block group into individual blocks. + * + * @param bg The striped block group + * @param cellSize The size of a striping cell + * @param dataBlkNum The number of data blocks + * @return An array containing the blocks in the group + */ + public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg, + int cellSize, int dataBlkNum, int parityBlkNum) { + int locatedBGSize = bg.getBlockIndices().length; + LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum]; + for (short i = 0; i < locatedBGSize; i++) { + final int idx = bg.getBlockIndices()[i]; + // for now we do not use redundant replica of an internal block + if (idx < (dataBlkNum + parityBlkNum) && lbs[idx] == null) { + lbs[idx] = constructInternalBlock(bg, i, cellSize, + dataBlkNum, idx); + } + } + return lbs; + } + + /** + * This method creates an internal block at the given index of a block group + * + * @param idxInReturnedLocs The index in the stored locations in the + * {@link LocatedStripedBlock} object + * @param idxInBlockGroup The logical index in the striped block group + * @return The constructed internal block + */ + public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, + int idxInReturnedLocs, int cellSize, int dataBlkNum, + int idxInBlockGroup) { + final ExtendedBlock blk = constructInternalBlock( + bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup); + final LocatedBlock locatedBlock; + if (idxInReturnedLocs < bg.getLocations().length) { + locatedBlock = new LocatedBlock(blk, + new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, + new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, + new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, + bg.getStartOffset(), bg.isCorrupt(), null); + } else { + locatedBlock = new LocatedBlock(blk, null, null, null, + bg.getStartOffset(), bg.isCorrupt(), null); + } + Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens(); + if (idxInReturnedLocs < blockTokens.length) { + locatedBlock.setBlockToken(blockTokens[idxInReturnedLocs]); + } + return locatedBlock; + } + + /** + * This method creates an internal {@link ExtendedBlock} at the given index + * of a block group. + */ + public static ExtendedBlock constructInternalBlock(ExtendedBlock blockGroup, + int cellSize, int dataBlkNum, int idxInBlockGroup) { + ExtendedBlock block = new ExtendedBlock(blockGroup); + block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup); + block.setNumBytes(getInternalBlockLength(blockGroup.getNumBytes(), + cellSize, dataBlkNum, idxInBlockGroup)); + return block; + } + + /** + * Get the size of an internal block at the given index of a block group + * + * @param dataSize Size of the block group only counting data blocks + * @param cellSize The size of a striping cell + * @param numDataBlocks The number of data blocks + * @param i The logical index in the striped block group + * @return The size of the internal block at the specified index + */ + public static long getInternalBlockLength(long dataSize, + int cellSize, int numDataBlocks, int i) { + Preconditions.checkArgument(dataSize >= 0); + Preconditions.checkArgument(cellSize > 0); + Preconditions.checkArgument(numDataBlocks > 0); + Preconditions.checkArgument(i >= 0); + // Size of each stripe (only counting data blocks) + final int stripeSize = cellSize * numDataBlocks; + // If block group ends at stripe boundary, each internal block has an equal + // share of the group + final int lastStripeDataLen = (int)(dataSize % stripeSize); + if (lastStripeDataLen == 0) { + return dataSize / numDataBlocks; + } + + final int numStripes = (int) ((dataSize - 1) / stripeSize + 1); + return (numStripes - 1L)*cellSize + + lastCellSize(lastStripeDataLen, cellSize, numDataBlocks, i); + } + + private static int lastCellSize(int size, int cellSize, int numDataBlocks, + int i) { + if (i < numDataBlocks) { + // parity block size (i.e. i >= numDataBlocks) is the same as + // the first data block size (i.e. i = 0). + size -= i*cellSize; + if (size < 0) { + size = 0; + } + } + return size > cellSize? cellSize: size; + } + + /** + * Given a byte's offset in an internal block, calculate the offset in + * the block group + */ + public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum, + long offsetInBlk, int idxInBlockGroup) { + int cellIdxInBlk = (int) (offsetInBlk / cellSize); + return cellIdxInBlk * cellSize * dataBlkNum // n full stripes before offset + + idxInBlockGroup * cellSize // m full cells before offset + + offsetInBlk % cellSize; // partial cell + } + + /** + * Get the next completed striped read task + * + * @return {@link StripingChunkReadResult} indicating the status of the read task + * succeeded, and the block index of the task. If the method times + * out without getting any completed read tasks, -1 is returned as + * block index. + * @throws InterruptedException + */ + public static StripingChunkReadResult getNextCompletedStripedRead( + CompletionService<Void> readService, Map<Future<Void>, Integer> futures, + final long threshold) throws InterruptedException { + Preconditions.checkArgument(!futures.isEmpty()); + Future<Void> future = null; + try { + if (threshold > 0) { + future = readService.poll(threshold, TimeUnit.MILLISECONDS); + } else { + future = readService.take(); + } + if (future != null) { + future.get(); + return new StripingChunkReadResult(futures.remove(future), + StripingChunkReadResult.SUCCESSFUL); + } else { + return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT); + } + } catch (ExecutionException e) { + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("ExecutionException " + e); + } + return new StripingChunkReadResult(futures.remove(future), + StripingChunkReadResult.FAILED); + } catch (CancellationException e) { + return new StripingChunkReadResult(futures.remove(future), + StripingChunkReadResult.CANCELLED); + } + } + + /** + * Get the total usage of the striped blocks, which is the total of data + * blocks and parity blocks + * + * @param numDataBlkBytes + * Size of the block group only counting data blocks + * @param dataBlkNum + * The number of data blocks + * @param parityBlkNum + * The number of parity blocks + * @param cellSize + * The size of a striping cell + * @return The total usage of data blocks and parity blocks + */ + public static long spaceConsumedByStripedBlock(long numDataBlkBytes, + int dataBlkNum, int parityBlkNum, int cellSize) { + int parityIndex = dataBlkNum + 1; + long numParityBlkBytes = getInternalBlockLength(numDataBlkBytes, cellSize, + dataBlkNum, parityIndex) * parityBlkNum; + return numDataBlkBytes + numParityBlkBytes; + } + + /** + * Initialize the decoding input buffers based on the chunk states in an + * {@link AlignedStripe}. For each chunk that was not initially requested, + * schedule a new fetch request with the decoding input buffer as transfer + * destination. + */ + public static byte[][] initDecodeInputs(AlignedStripe alignedStripe, + int dataBlkNum, int parityBlkNum) { + byte[][] decodeInputs = + new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()]; + // read the full data aligned stripe + for (int i = 0; i < dataBlkNum; i++) { + if (alignedStripe.chunks[i] == null) { + final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); + alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]); + alignedStripe.chunks[i].addByteArraySlice(0, + (int) alignedStripe.getSpanInBlock()); + } + } + return decodeInputs; + } + + /** + * Some fetched {@link StripingChunk} might be stored in original application + * buffer instead of prepared decode input buffers. Some others are beyond + * the range of the internal blocks and should correspond to all zero bytes. + * When all pending requests have returned, this method should be called to + * finalize decode input buffers. + */ + public static void finalizeDecodeInputs(final byte[][] decodeInputs, + int dataBlkNum, int parityBlkNum, AlignedStripe alignedStripe) { + for (int i = 0; i < alignedStripe.chunks.length; i++) { + final StripingChunk chunk = alignedStripe.chunks[i]; + final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); + if (chunk != null && chunk.state == StripingChunk.FETCHED) { + chunk.copyTo(decodeInputs[decodeIndex]); + } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) { + Arrays.fill(decodeInputs[decodeIndex], (byte) 0); + } else { + decodeInputs[decodeIndex] = null; + } + } + } + + /** + * Currently decoding requires parity chunks are before data chunks. + * The indices are opposite to what we store in NN. In future we may + * improve the decoding to make the indices order the same as in NN. + * + * @param index The index to convert + * @param dataBlkNum The number of data blocks + * @param parityBlkNum The number of parity blocks + * @return converted index + */ + public static int convertIndex4Decode(int index, int dataBlkNum, + int parityBlkNum) { + return index < dataBlkNum ? index + parityBlkNum : index - dataBlkNum; + } + + public static int convertDecodeIndexBack(int index, int dataBlkNum, + int parityBlkNum) { + return index < parityBlkNum ? index + dataBlkNum : index - parityBlkNum; + } + + /** + * Decode based on the given input buffers and schema. + */ + public static void decodeAndFillBuffer(final byte[][] decodeInputs, + AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum, + RawErasureDecoder decoder) { + // Step 1: prepare indices and output buffers for missing data units + int[] decodeIndices = new int[parityBlkNum]; + int pos = 0; + for (int i = 0; i < dataBlkNum; i++) { + if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state == StripingChunk.MISSING){ + decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum); + } + } + decodeIndices = Arrays.copyOf(decodeIndices, pos); + byte[][] decodeOutputs = + new byte[decodeIndices.length][(int) alignedStripe.getSpanInBlock()]; + + // Step 2: decode into prepared output buffers + decoder.decode(decodeInputs, decodeIndices, decodeOutputs); + + // Step 3: fill original application buffer with decoded data + for (int i = 0; i < decodeIndices.length; i++) { + int missingBlkIdx = convertDecodeIndexBack(decodeIndices[i], + dataBlkNum, parityBlkNum); + StripingChunk chunk = alignedStripe.chunks[missingBlkIdx]; + if (chunk.state == StripingChunk.MISSING) { + chunk.copyFrom(decodeOutputs[i]); + } + } + } + + /** + * Similar functionality with {@link #divideByteRangeIntoStripes}, but is used + * by stateful read and uses ByteBuffer as reading target buffer. Besides the + * read range is within a single stripe thus the calculation logic is simpler. + */ + public static AlignedStripe[] divideOneStripe(ECSchema ecSchema, + int cellSize, LocatedStripedBlock blockGroup, long rangeStartInBlockGroup, + long rangeEndInBlockGroup, ByteBuffer buf) { + final int dataBlkNum = ecSchema.getNumDataUnits(); + // Step 1: map the byte range to StripingCells + StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, cellSize, + blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup); + + // Step 2: get the unmerged ranges on each internal block + VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, cellSize, + cells); + + // Step 3: merge into stripes + AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges); + + // Step 4: calculate each chunk's position in destination buffer. Since the + // whole read range is within a single stripe, the logic is simpler here. + int bufOffset = (int) (rangeStartInBlockGroup % (cellSize * dataBlkNum)); + for (StripingCell cell : cells) { + long cellStart = cell.idxInInternalBlk * cellSize + cell.offset; + long cellEnd = cellStart + cell.size - 1; + for (AlignedStripe s : stripes) { + long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1; + long overlapStart = Math.max(cellStart, s.getOffsetInBlock()); + long overlapEnd = Math.min(cellEnd, stripeEnd); + int overLapLen = (int) (overlapEnd - overlapStart + 1); + if (overLapLen > 0) { + Preconditions.checkState(s.chunks[cell.idxInStripe] == null); + final int pos = (int) (bufOffset + overlapStart - cellStart); + buf.position(pos); + buf.limit(pos + overLapLen); + s.chunks[cell.idxInStripe] = new StripingChunk(buf.slice()); + } + } + bufOffset += cell.size; + } + + // Step 5: prepare ALLZERO blocks + prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum); + return stripes; + } + + /** + * This method divides a requested byte range into an array of inclusive + * {@link AlignedStripe}. + * @param ecSchema The codec schema for the file, which carries the numbers + * of data / parity blocks + * @param cellSize Cell size of stripe + * @param blockGroup The striped block group + * @param rangeStartInBlockGroup The byte range's start offset in block group + * @param rangeEndInBlockGroup The byte range's end offset in block group + * @param buf Destination buffer of the read operation for the byte range + * @param offsetInBuf Start offset into the destination buffer + * + * At most 5 stripes will be generated from each logical range, as + * demonstrated in the header of {@link AlignedStripe}. + */ + public static AlignedStripe[] divideByteRangeIntoStripes(ECSchema ecSchema, + int cellSize, LocatedStripedBlock blockGroup, + long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf, + int offsetInBuf) { + + // Step 0: analyze range and calculate basic parameters + final int dataBlkNum = ecSchema.getNumDataUnits(); + + // Step 1: map the byte range to StripingCells + StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, cellSize, + blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup); + + // Step 2: get the unmerged ranges on each internal block + VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, cellSize, + cells); + + // Step 3: merge into at most 5 stripes + AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges); + + // Step 4: calculate each chunk's position in destination buffer + calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf); + + // Step 5: prepare ALLZERO blocks + prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum); + + return stripes; + } + + /** + * Map the logical byte range to a set of inclusive {@link StripingCell} + * instances, each representing the overlap of the byte range to a cell + * used by {@link DFSStripedOutputStream} in encoding + */ + @VisibleForTesting + private static StripingCell[] getStripingCellsOfByteRange(ECSchema ecSchema, + int cellSize, LocatedStripedBlock blockGroup, + long rangeStartInBlockGroup, long rangeEndInBlockGroup) { + Preconditions.checkArgument( + rangeStartInBlockGroup <= rangeEndInBlockGroup && + rangeEndInBlockGroup < blockGroup.getBlockSize()); + long len = rangeEndInBlockGroup - rangeStartInBlockGroup + 1; + int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize); + int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize); + int numCells = lastCellIdxInBG - firstCellIdxInBG + 1; + StripingCell[] cells = new StripingCell[numCells]; + + final int firstCellOffset = (int) (rangeStartInBlockGroup % cellSize); + final int firstCellSize = + (int) Math.min(cellSize - (rangeStartInBlockGroup % cellSize), len); + cells[0] = new StripingCell(ecSchema, firstCellSize, firstCellIdxInBG, + firstCellOffset); + if (lastCellIdxInBG != firstCellIdxInBG) { + final int lastCellSize = (int) (rangeEndInBlockGroup % cellSize) + 1; + cells[numCells - 1] = new StripingCell(ecSchema, lastCellSize, + lastCellIdxInBG, 0); + } + + for (int i = 1; i < numCells - 1; i++) { + cells[i] = new StripingCell(ecSchema, cellSize, i + firstCellIdxInBG, 0); + } + + return cells; + } + + /** + * Given a logical byte range, mapped to each {@link StripingCell}, calculate + * the physical byte range (inclusive) on each stored internal block. + */ + @VisibleForTesting + private static VerticalRange[] getRangesForInternalBlocks(ECSchema ecSchema, + int cellSize, StripingCell[] cells) { + int dataBlkNum = ecSchema.getNumDataUnits(); + int parityBlkNum = ecSchema.getNumParityUnits(); + + VerticalRange ranges[] = new VerticalRange[dataBlkNum + parityBlkNum]; + + long earliestStart = Long.MAX_VALUE; + long latestEnd = -1; + for (StripingCell cell : cells) { + // iterate through all cells and update the list of StripeRanges + if (ranges[cell.idxInStripe] == null) { + ranges[cell.idxInStripe] = new VerticalRange( + cell.idxInInternalBlk * cellSize + cell.offset, cell.size); + } else { + ranges[cell.idxInStripe].spanInBlock += cell.size; + } + VerticalRange range = ranges[cell.idxInStripe]; + if (range.offsetInBlock < earliestStart) { + earliestStart = range.offsetInBlock; + } + if (range.offsetInBlock + range.spanInBlock - 1 > latestEnd) { + latestEnd = range.offsetInBlock + range.spanInBlock - 1; + } + } + + // Each parity block should be fetched at maximum range of all data blocks + for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) { + ranges[i] = new VerticalRange(earliestStart, + latestEnd - earliestStart + 1); + } + + return ranges; + } + + /** + * Merge byte ranges on each internal block into a set of inclusive + * {@link AlignedStripe} instances. + */ + private static AlignedStripe[] mergeRangesForInternalBlocks( + ECSchema ecSchema, VerticalRange[] ranges) { + int dataBlkNum = ecSchema.getNumDataUnits(); + int parityBlkNum = ecSchema.getNumParityUnits(); + List<AlignedStripe> stripes = new ArrayList<>(); + SortedSet<Long> stripePoints = new TreeSet<>(); + for (VerticalRange r : ranges) { + if (r != null) { + stripePoints.add(r.offsetInBlock); + stripePoints.add(r.offsetInBlock + r.spanInBlock); + } + } + + long prev = -1; + for (long point : stripePoints) { + if (prev >= 0) { + stripes.add(new AlignedStripe(prev, point - prev, + dataBlkNum + parityBlkNum)); + } + prev = point; + } + return stripes.toArray(new AlignedStripe[stripes.size()]); + } + + private static void calcualteChunkPositionsInBuf(int cellSize, + AlignedStripe[] stripes, StripingCell[] cells, byte[] buf, + int offsetInBuf) { + /** + * | <--------------- AlignedStripe --------------->| + * + * |<- length_0 ->|<-- length_1 -->|<- length_2 ->| + * +------------------+------------------+----------------+ + * | cell_0_0_0 | cell_3_1_0 | cell_6_2_0 | <- blk_0 + * +------------------+------------------+----------------+ + * _/ \_______________________ + * | | + * v offset_0 v offset_1 + * +----------------------------------------------------------+ + * | cell_0_0_0 | cell_1_0_1 and cell_2_0_2 |cell_3_1_0 ...| <- buf + * | (partial) | (from blk_1 and blk_2) | | + * +----------------------------------------------------------+ + * + * Cell indexing convention defined in {@link StripingCell} + */ + int done = 0; + for (StripingCell cell : cells) { + long cellStart = cell.idxInInternalBlk * cellSize + cell.offset; + long cellEnd = cellStart + cell.size - 1; + for (AlignedStripe s : stripes) { + long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1; + long overlapStart = Math.max(cellStart, s.getOffsetInBlock()); + long overlapEnd = Math.min(cellEnd, stripeEnd); + int overLapLen = (int) (overlapEnd - overlapStart + 1); + if (overLapLen <= 0) { + continue; + } + if (s.chunks[cell.idxInStripe] == null) { + s.chunks[cell.idxInStripe] = new StripingChunk(buf); + } + s.chunks[cell.idxInStripe].addByteArraySlice( + (int)(offsetInBuf + done + overlapStart - cellStart), overLapLen); + } + done += cell.size; + } + } + + /** + * If a {@link StripingChunk} maps to a byte range beyond an internal block's + * size, the chunk should be treated as zero bytes in decoding. + */ + private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup, + AlignedStripe[] stripes, int cellSize, int dataBlkNum) { + for (AlignedStripe s : stripes) { + for (int i = 0; i < dataBlkNum; i++) { + long internalBlkLen = getInternalBlockLength(blockGroup.getBlockSize(), + cellSize, dataBlkNum, i); + if (internalBlkLen <= s.getOffsetInBlock()) { + Preconditions.checkState(s.chunks[i] == null); + s.chunks[i] = new StripingChunk(StripingChunk.ALLZERO); + } + } + } + } + + /** + * Cell is the unit of encoding used in {@link DFSStripedOutputStream}. This + * size impacts how a logical offset in the file or block group translates + * to physical byte offset in a stored internal block. The StripingCell util + * class facilitates this calculation. Each StripingCell is inclusive with + * its start and end offsets -- e.g., the end logical offset of cell_0_0_0 + * should be 1 byte lower than the start logical offset of cell_1_0_1. + * + * | <------- Striped Block Group -------> | + * blk_0 blk_1 blk_2 + * | | | + * v v v + * +----------+ +----------+ +----------+ + * |cell_0_0_0| |cell_1_0_1| |cell_2_0_2| + * +----------+ +----------+ +----------+ + * |cell_3_1_0| |cell_4_1_1| |cell_5_1_2| <- {@link #idxInBlkGroup} = 5 + * +----------+ +----------+ +----------+ {@link #idxInInternalBlk} = 1 + * {@link #idxInStripe} = 2 + * A StripingCell is a special instance of {@link StripingChunk} whose offset + * and size align with the cell used when writing data. + * TODO: consider parity cells + */ + @VisibleForTesting + static class StripingCell { + final ECSchema schema; + /** Logical order in a block group, used when doing I/O to a block group */ + final int idxInBlkGroup; + final int idxInInternalBlk; + final int idxInStripe; + /** + * When a logical byte range is mapped to a set of cells, it might + * partially overlap with the first and last cells. This field and the + * {@link #size} variable represent the start offset and size of the + * overlap. + */ + final int offset; + final int size; + + StripingCell(ECSchema ecSchema, int cellSize, int idxInBlkGroup, + int offset) { + this.schema = ecSchema; + this.idxInBlkGroup = idxInBlkGroup; + this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits(); + this.idxInStripe = idxInBlkGroup - + this.idxInInternalBlk * ecSchema.getNumDataUnits(); + this.offset = offset; + this.size = cellSize; + } + } + + /** + * Given a requested byte range on a striped block group, an AlignedStripe + * represents an inclusive {@link VerticalRange} that is aligned with both + * the byte range and boundaries of all internal blocks. As illustrated in + * the diagram, any given byte range on a block group leads to 1~5 + * AlignedStripe's. + * + * |<-------- Striped Block Group -------->| + * blk_0 blk_1 blk_2 blk_3 blk_4 + * +----+ | +----+ +----+ + * |full| | | | | | <- AlignedStripe0: + * +----+ |~~~~| | |~~~~| |~~~~| 1st cell is partial + * |part| | | | | | | | <- AlignedStripe1: byte range + * +----+ +----+ +----+ | |~~~~| |~~~~| doesn't start at 1st block + * |full| |full| |full| | | | | | + * |cell| |cell| |cell| | | | | | <- AlignedStripe2 (full stripe) + * | | | | | | | | | | | + * +----+ +----+ +----+ | |~~~~| |~~~~| + * |full| |part| | | | | | <- AlignedStripe3: byte range + * |~~~~| +----+ | |~~~~| |~~~~| doesn't end at last block + * | | | | | | | <- AlignedStripe4: + * +----+ | +----+ +----+ last cell is partial + * | + * <---- data blocks ----> | <--- parity ---> + * + * An AlignedStripe is the basic unit of reading from a striped block group, + * because within the AlignedStripe, all internal blocks can be processed in + * a uniform manner. + * + * The coverage of an AlignedStripe on an internal block is represented as a + * {@link StripingChunk}. + * + * To simplify the logic of reading a logical byte range from a block group, + * a StripingChunk is either completely in the requested byte range or + * completely outside the requested byte range. + */ + public static class AlignedStripe { + public VerticalRange range; + /** status of each chunk in the stripe */ + public final StripingChunk[] chunks; + public int fetchedChunksNum = 0; + public int missingChunksNum = 0; + + public AlignedStripe(long offsetInBlock, long length, int width) { + Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0); + this.range = new VerticalRange(offsetInBlock, length); + this.chunks = new StripingChunk[width]; + } + + public boolean include(long pos) { + return range.include(pos); + } + + public long getOffsetInBlock() { + return range.offsetInBlock; + } + + public long getSpanInBlock() { + return range.spanInBlock; + } + + @Override + public String toString() { + return "Offset=" + range.offsetInBlock + ", length=" + range.spanInBlock + + ", fetchedChunksNum=" + fetchedChunksNum + + ", missingChunksNum=" + missingChunksNum; + } + } + + /** + * A simple utility class representing an arbitrary vertical inclusive range + * starting at {@link #offsetInBlock} and lasting for {@link #spanInBlock} + * bytes in an internal block. Note that VerticalRange doesn't necessarily + * align with {@link StripingCell}. + * + * |<- Striped Block Group ->| + * blk_0 + * | + * v + * +-----+ + * |~~~~~| <-- {@link #offsetInBlock} + * | | ^ + * | | | + * | | | {@link #spanInBlock} + * | | v + * |~~~~~| --- + * | | + * +-----+ + */ + public static class VerticalRange { + /** start offset in the block group (inclusive) */ + public long offsetInBlock; + /** length of the stripe range */ + public long spanInBlock; + + public VerticalRange(long offsetInBlock, long length) { + Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0); + this.offsetInBlock = offsetInBlock; + this.spanInBlock = length; + } + + /** whether a position is in the range */ + public boolean include(long pos) { + return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock; + } + } + + /** + * Indicates the coverage of an {@link AlignedStripe} on an internal block, + * and the state of the chunk in the context of the read request. + * + * |<---------------- Striped Block Group --------------->| + * blk_0 blk_1 blk_2 blk_3 blk_4 + * +---------+ | +----+ +----+ + * null null |REQUESTED| | |null| |null| <- AlignedStripe0 + * +---------+ |---------| | |----| |----| + * null |REQUESTED| |REQUESTED| | |null| |null| <- AlignedStripe1 + * +---------+ +---------+ +---------+ | +----+ +----+ + * |REQUESTED| |REQUESTED| ALLZERO | |null| |null| <- AlignedStripe2 + * +---------+ +---------+ | +----+ +----+ + * <----------- data blocks ------------> | <--- parity ---> + */ + public static class StripingChunk { + /** Chunk has been successfully fetched */ + public static final int FETCHED = 0x01; + /** Chunk has encountered failed when being fetched */ + public static final int MISSING = 0x02; + /** Chunk being fetched (fetching task is in-flight) */ + public static final int PENDING = 0x04; + /** + * Chunk is requested either by application or for decoding, need to + * schedule read task + */ + public static final int REQUESTED = 0X08; + /** + * Internal block is short and has no overlap with chunk. Chunk considered + * all-zero bytes in codec calculations. + */ + public static final int ALLZERO = 0X0f; + + /** + * If a chunk is completely in requested range, the state transition is: + * REQUESTED (when AlignedStripe created) -> PENDING -> {FETCHED | MISSING} + * If a chunk is completely outside requested range (including parity + * chunks), state transition is: + * null (AlignedStripe created) -> REQUESTED (upon failure) -> PENDING ... + */ + public int state = REQUESTED; + + public final ChunkByteArray byteArray; + public final ByteBuffer byteBuffer; + + public StripingChunk(byte[] buf) { + this.byteArray = new ChunkByteArray(buf); + byteBuffer = null; + } + + public StripingChunk(ByteBuffer buf) { + this.byteArray = null; + this.byteBuffer = buf; + } + + public StripingChunk(int state) { + this.byteArray = null; + this.byteBuffer = null; + this.state = state; + } + + public void addByteArraySlice(int offset, int length) { + assert byteArray != null; + byteArray.offsetsInBuf.add(offset); + byteArray.lengthsInBuf.add(length); + } + + void copyTo(byte[] target) { + assert byteArray != null; + byteArray.copyTo(target); + } + + void copyFrom(byte[] src) { + assert byteArray != null; + byteArray.copyFrom(src); + } + } + + public static class ChunkByteArray { + private final byte[] buf; + private final List<Integer> offsetsInBuf; + private final List<Integer> lengthsInBuf; + + ChunkByteArray(byte[] buf) { + this.buf = buf; + this.offsetsInBuf = new ArrayList<>(); + this.lengthsInBuf = new ArrayList<>(); + } + + public int[] getOffsets() { + int[] offsets = new int[offsetsInBuf.size()]; + for (int i = 0; i < offsets.length; i++) { + offsets[i] = offsetsInBuf.get(i); + } + return offsets; + } + + public int[] getLengths() { + int[] lens = new int[this.lengthsInBuf.size()]; + for (int i = 0; i < lens.length; i++) { + lens[i] = this.lengthsInBuf.get(i); + } + return lens; + } + + public byte[] buf() { + return buf; + } + + void copyTo(byte[] target) { + int posInBuf = 0; + for (int i = 0; i < offsetsInBuf.size(); i++) { + System.arraycopy(buf, offsetsInBuf.get(i), + target, posInBuf, lengthsInBuf.get(i)); + posInBuf += lengthsInBuf.get(i); + } + } + + void copyFrom(byte[] src) { + int srcPos = 0; + for (int j = 0; j < offsetsInBuf.size(); j++) { + System.arraycopy(src, srcPos, buf, offsetsInBuf.get(j), + lengthsInBuf.get(j)); + srcPos += lengthsInBuf.get(j); + } + } + } + + /** + * This class represents result from a striped read request. + * If the task was successful or the internal computation failed, + * an index is also returned. + */ + public static class StripingChunkReadResult { + public static final int SUCCESSFUL = 0x01; + public static final int FAILED = 0x02; + public static final int TIMEOUT = 0x04; + public static final int CANCELLED = 0x08; + + public final int index; + public final int state; + + public StripingChunkReadResult(int state) { + Preconditions.checkArgument(state == TIMEOUT, + "Only timeout result should return negative index."); + this.index = -1; + this.state = state; + } + + public StripingChunkReadResult(int index, int state) { + Preconditions.checkArgument(state != TIMEOUT, + "Timeout result should return negative index."); + this.index = index; + this.state = state; + } + + @Override + public String toString() { + return "(index=" + index + ", state =" + state + ")"; + } + } + + /** + * Check if the information such as IDs and generation stamps in block-i + * match block-j, where block-i and block-j are in the same group. + */ + public static void checkBlocks(int j, ExtendedBlock blockj, + int i, ExtendedBlock blocki) throws IOException { + + if (!blocki.getBlockPoolId().equals(blockj.getBlockPoolId())) { + throw new IOException("Block pool IDs mismatched: block" + j + "=" + + blockj + ", block" + i + "=" + blocki); + } + if (blocki.getBlockId() - i != blockj.getBlockId() - j) { + throw new IOException("Block IDs mismatched: block" + j + "=" + + blockj + ", block" + i + "=" + blocki); + } + if (blocki.getGenerationStamp() != blockj.getGenerationStamp()) { + throw new IOException("Generation stamps mismatched: block" + j + "=" + + blockj + ", block" + i + "=" + blocki); + } + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index b87e753..9d24b91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -33,6 +33,7 @@ package hadoop.hdfs.datanode; import "HAServiceProtocol.proto"; import "hdfs.proto"; +import "erasurecoding.proto"; /** * Information to identify a datanode to a namenode @@ -58,6 +59,7 @@ message DatanodeCommandProto { UnusedUpgradeCommand = 6; NullDatanodeCommand = 7; BlockIdCommand = 8; + BlockECRecoveryCommand = 9; } required Type cmdType = 1; // Type of the command @@ -71,6 +73,7 @@ message DatanodeCommandProto { optional KeyUpdateCommandProto keyUpdateCmd = 6; optional RegisterCommandProto registerCmd = 7; optional BlockIdCommandProto blkIdCmd = 8; + optional BlockECRecoveryCommandProto blkECRecoveryCmd = 9; } /** @@ -145,6 +148,13 @@ message RegisterCommandProto { } /** + * Block Erasure coding recovery command + */ +message BlockECRecoveryCommandProto { + repeated BlockECRecoveryInfoProto blockECRecoveryinfo = 1; +} + +/** * registration - Information of the datanode registering with the namenode */ message RegisterDatanodeRequestProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto index 3bd1d91..3233f66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto @@ -73,6 +73,7 @@ message NameSystemSection { optional uint64 lastAllocatedBlockId = 5; optional uint64 transactionId = 6; optional uint64 rollingUpgradeStartTime = 7; + optional uint64 lastAllocatedStripedBlockId = 8; } /** @@ -139,6 +140,8 @@ message INodeSection { optional AclFeatureProto acl = 8; optional XAttrFeatureProto xAttrs = 9; optional uint32 storagePolicyID = 10; + optional bool isStriped = 11; + optional uint64 stripingCellSize = 12; } message QuotaByStorageTypeEntryProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 8cb7d5f..fcf2bc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2370,11 +2370,11 @@ </description> </property> - <property> - <name>dfs.datanode.block-pinning.enabled</name> - <value>false</value> - <description>Whether pin blocks on favored DataNode.</description> - </property> +<property> + <name>dfs.datanode.block-pinning.enabled</name> + <value>false</value> + <description>Whether pin blocks on favored DataNode.</description> +</property> <property> <name>dfs.client.block.write.locateFollowingBlock.initial.delay.ms</name> @@ -2412,4 +2412,25 @@ </description> </property> +<property> + <name>dfs.datanode.stripedread.threshold.millis</name> + <value>5000</value> + <description>datanode striped read threshold in millisecond. + </description> +</property> + +<property> + <name>dfs.datanode.stripedread.threads</name> + <value>20</value> + <description>datanode striped read thread pool size. + </description> +</property> + +<property> + <name>dfs.datanode.stripedread.buffer.size</name> + <value>262144</value> + <description>datanode striped read buffer size. + </description> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java new file mode 100644 index 0000000..6c06a8d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cli; + +import org.apache.hadoop.cli.util.CLICommandErasureCodingCli; +import org.apache.hadoop.cli.util.CLICommandTypes; +import org.apache.hadoop.cli.util.CLITestCmd; +import org.apache.hadoop.cli.util.CommandExecutor; +import org.apache.hadoop.cli.util.ErasureCodingCliCmdExecutor; +import org.apache.hadoop.hdfs.tools.erasurecode.ECCli; + +public class CLITestCmdErasureCoding extends CLITestCmd { + public CLITestCmdErasureCoding(String str, CLICommandTypes type) { + super(str, type); + } + + @Override + public CommandExecutor getExecutor(String tag) throws IllegalArgumentException { + if (getType() instanceof CLICommandErasureCodingCli) + return new ErasureCodingCliCmdExecutor(tag, new ECCli()); + return super.getExecutor(tag); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java new file mode 100644 index 0000000..5f01ea2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.cli; + +import org.apache.hadoop.cli.util.CLICommand; +import org.apache.hadoop.cli.util.CLICommandErasureCodingCli; +import org.apache.hadoop.cli.util.CommandExecutor.Result; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.xml.sax.SAXException; + +public class TestErasureCodingCLI extends CLITestHelper { + private final int NUM_OF_DATANODES = 3; + private MiniDFSCluster dfsCluster = null; + private FileSystem fs = null; + private String namenode = null; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + dfsCluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(NUM_OF_DATANODES).build(); + dfsCluster.waitClusterUp(); + namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///"); + + username = System.getProperty("user.name"); + + fs = dfsCluster.getFileSystem(); + } + + @Override + protected String getTestFile() { + return "testErasureCodingConf.xml"; + } + + @After + @Override + public void tearDown() throws Exception { + if (fs != null) { + fs.close(); + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + Thread.sleep(2000); + super.tearDown(); + } + + @Override + protected String expandCommand(final String cmd) { + String expCmd = cmd; + expCmd = expCmd.replaceAll("NAMENODE", namenode); + expCmd = expCmd.replaceAll("#LF#", System.getProperty("line.separator")); + expCmd = super.expandCommand(expCmd); + return expCmd; + } + + @Override + protected TestConfigFileParser getConfigParser() { + return new TestErasureCodingAdmin(); + } + + private class TestErasureCodingAdmin extends + CLITestHelper.TestConfigFileParser { + @Override + public void endElement(String uri, String localName, String qName) + throws SAXException { + if (qName.equals("ec-admin-command")) { + if (testCommands != null) { + testCommands.add(new CLITestCmdErasureCoding(charString, + new CLICommandErasureCodingCli())); + } else if (cleanupCommands != null) { + cleanupCommands.add(new CLITestCmdErasureCoding(charString, + new CLICommandErasureCodingCli())); + } + } else { + super.endElement(uri, localName, qName); + } + } + } + + @Override + protected Result execute(CLICommand cmd) throws Exception { + return cmd.getExecutor(namenode).executeCommand(cmd.getCmd()); + } + + @Test + @Override + public void testAll() { + super.testAll(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandErasureCodingCli.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandErasureCodingCli.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandErasureCodingCli.java new file mode 100644 index 0000000..aafcd9f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandErasureCodingCli.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cli.util; + +public class CLICommandErasureCodingCli implements CLICommandTypes { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/ErasureCodingCliCmdExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/ErasureCodingCliCmdExecutor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/ErasureCodingCliCmdExecutor.java new file mode 100644 index 0000000..e993313 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/ErasureCodingCliCmdExecutor.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cli.util; + +import org.apache.hadoop.hdfs.tools.erasurecode.ECCli; +import org.apache.hadoop.util.ToolRunner; + +public class ErasureCodingCliCmdExecutor extends CommandExecutor { + protected String namenode = null; + protected ECCli admin = null; + + public ErasureCodingCliCmdExecutor(String namenode, ECCli admin) { + this.namenode = namenode; + this.admin = admin; + } + + @Override + protected void execute(final String cmd) throws Exception { + String[] args = getCommandAsArgs(cmd, "NAMENODE", this.namenode); + ToolRunner.run(admin, args); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java index 88b7f37..829cf03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java @@ -165,20 +165,19 @@ public class BlockReaderTestUtil { */ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead) throws IOException { - return getBlockReader(cluster, testBlock, offset, lenToRead); + return getBlockReader(cluster.getFileSystem(), testBlock, offset, lenToRead); } /** * Get a BlockReader for the given block. */ - public static BlockReader getBlockReader(MiniDFSCluster cluster, - LocatedBlock testBlock, int offset, int lenToRead) throws IOException { + public static BlockReader getBlockReader(final DistributedFileSystem fs, + LocatedBlock testBlock, int offset, long lenToRead) throws IOException { InetSocketAddress targetAddr = null; ExtendedBlock block = testBlock.getBlock(); DatanodeInfo[] nodes = testBlock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); - final DistributedFileSystem fs = cluster.getFileSystem(); return new BlockReaderFactory(fs.getClient().getConf()). setInetSocketAddress(targetAddr). setBlock(block). http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index a742757..3f0d6df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -66,6 +66,12 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; @@ -105,6 +111,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -125,14 +132,19 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; @@ -153,12 +165,8 @@ import org.junit.Assume; import org.mockito.internal.util.reflection.Whitebox; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; /** Utilities for HDFS tests */ public class DFSTestUtil { @@ -808,15 +816,21 @@ public class DFSTestUtil { return os.toByteArray(); } - /* Write the given string to the given file */ - public static void writeFile(FileSystem fs, Path p, String s) + /* Write the given bytes to the given file */ + public static void writeFile(FileSystem fs, Path p, byte[] bytes) throws IOException { if (fs.exists(p)) { fs.delete(p, true); } - InputStream is = new ByteArrayInputStream(s.getBytes()); + InputStream is = new ByteArrayInputStream(bytes); FSDataOutputStream os = fs.create(p); - IOUtils.copyBytes(is, os, s.length(), true); + IOUtils.copyBytes(is, os, bytes.length, true); + } + + /* Write the given string to the given file */ + public static void writeFile(FileSystem fs, Path p, String s) + throws IOException { + writeFile(fs, p, s.getBytes()); } /* Append the given string to the given file */ @@ -1835,7 +1849,7 @@ public class DFSTestUtil { dn.setLastUpdate(Time.now() + offset); dn.setLastUpdateMonotonic(Time.monotonicNow() + offset); } - + /** * This method takes a set of block locations and fills the provided buffer * with expected bytes based on simulated content from @@ -1859,4 +1873,150 @@ public class DFSTestUtil { } } + public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock( + Block block, BlockStatus blockStatus, DatanodeStorage storage) { + ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1]; + receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null); + StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1]; + reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks); + return reports; + } + + /** + * Creates the metadata of a file in striped layout. This method only + * manipulates the NameNode state without injecting data to DataNode. + * You should disable periodical heartbeat before use this. + * @param file Path of the file to create + * @param dir Parent path of the file + * @param numBlocks Number of striped block groups to add to the file + * @param numStripesPerBlk Number of striped cells in each block + * @param toMkdir + */ + public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir, + int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception { + DistributedFileSystem dfs = cluster.getFileSystem(); + // If outer test already created EC zone, dir should be left as null + if (toMkdir) { + assert dir != null; + dfs.mkdirs(dir); + try { + dfs.getClient().createErasureCodingZone(dir.toString(), null, 0); + } catch (IOException e) { + if (!e.getMessage().contains("non-empty directory")) { + throw e; + } + } + } + + FSDataOutputStream out = null; + try { + out = dfs.create(file, (short) 1); // create an empty file + + FSNamesystem ns = cluster.getNamesystem(); + FSDirectory fsdir = ns.getFSDirectory(); + INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); + + ExtendedBlock previous = null; + for (int i = 0; i < numBlocks; i++) { + Block newBlock = addStripedBlockToFile(cluster.getDataNodes(), dfs, ns, + file.toString(), fileNode, dfs.getClient().getClientName(), + previous, numStripesPerBlk); + previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock); + } + + dfs.getClient().namenode.complete(file.toString(), + dfs.getClient().getClientName(), previous, fileNode.getId()); + } finally { + IOUtils.cleanup(null, out); + } + } + + /** + * Adds a striped block group to a file. This method only manipulates NameNode + * states of the file and the block without injecting data to DataNode. + * It does mimic block reports. + * You should disable periodical heartbeat before use this. + * @param dataNodes List DataNodes to host the striped block group + * @param previous Previous block in the file + * @param numStripes Number of stripes in each block group + * @return The added block group + */ + public static Block addStripedBlockToFile(List<DataNode> dataNodes, + DistributedFileSystem fs, FSNamesystem ns, String file, INodeFile fileNode, + String clientName, ExtendedBlock previous, int numStripes) + throws Exception { + fs.getClient().namenode.addBlock(file, clientName, previous, null, + fileNode.getId(), null); + + final BlockInfo lastBlock = fileNode.getLastBlock(); + final int groupSize = fileNode.getPreferredBlockReplication(); + assert dataNodes.size() >= groupSize; + // 1. RECEIVING_BLOCK IBR + for (int i = 0; i < groupSize; i++) { + DataNode dn = dataNodes.get(i); + final Block block = new Block(lastBlock.getBlockId() + i, 0, + lastBlock.getGenerationStamp()); + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(block, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage); + for (StorageReceivedDeletedBlocks report : reports) { + ns.processIncrementalBlockReport(dn.getDatanodeId(), report); + } + } + + // 2. RECEIVED_BLOCK IBR + for (int i = 0; i < groupSize; i++) { + DataNode dn = dataNodes.get(i); + final Block block = new Block(lastBlock.getBlockId() + i, + numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp()); + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(block, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + for (StorageReceivedDeletedBlocks report : reports) { + ns.processIncrementalBlockReport(dn.getDatanodeId(), report); + } + } + + lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS); + return lastBlock; + } + + /** + * Because currently DFSStripedOutputStream does not support hflush/hsync, + * tests can use this method to flush all the buffered data to DataNodes. + */ + public static ExtendedBlock flushInternal(DFSStripedOutputStream out) + throws IOException { + out.flushInternal(); + return out.getBlock(); + } + + /** + * Verify that blocks in striped block group are on different nodes, and every + * internal blocks exists. + */ + public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, + int groupSize) { + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + assert lb instanceof LocatedStripedBlock; + HashSet<DatanodeInfo> locs = new HashSet<>(); + for (DatanodeInfo datanodeInfo : lb.getLocations()) { + locs.add(datanodeInfo); + } + assertEquals(groupSize, lb.getLocations().length); + assertEquals(groupSize, locs.size()); + + // verify that every internal blocks exists + int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices(); + assertEquals(groupSize, blockIndices.length); + HashSet<Integer> found = new HashSet<>(); + for (int index : blockIndices) { + assert index >=0; + found.add(index); + } + assertEquals(groupSize, found.size()); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 7052321..65e26df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2113,8 +2113,6 @@ public class MiniDFSCluster { int node = -1; for (int i = 0; i < dataNodes.size(); i++) { DataNode dn = dataNodes.get(i).datanode; - LOG.info("DN name=" + dnName + " found DN=" + dn + - " with name=" + dn.getDisplayName()); if (dnName.equals(dn.getDatanodeId().getXferAddr())) { node = i; break; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java new file mode 100644 index 0000000..ca4b2aa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.web.ByteRangeInputStream; +import org.junit.Assert; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +public class StripedFileTestUtil { + public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class); + + static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + + static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + static final int stripesPerBlock = 4; + static final int blockSize = cellSize * stripesPerBlock; + static final int numDNs = dataBlocks + parityBlocks + 2; + + static final Random random = new Random(); + + static byte[] generateBytes(int cnt) { + byte[] bytes = new byte[cnt]; + for (int i = 0; i < cnt; i++) { + bytes[i] = getByte(i); + } + return bytes; + } + + static int readAll(FSDataInputStream in, byte[] buf) throws IOException { + int readLen = 0; + int ret; + while ((ret = in.read(buf, readLen, buf.length - readLen)) >= 0 && + readLen <= buf.length) { + readLen += ret; + } + return readLen; + } + + static byte getByte(long pos) { + final int mod = 29; + return (byte) (pos % mod + 1); + } + + static void verifyLength(FileSystem fs, Path srcPath, int fileLength) + throws IOException { + FileStatus status = fs.getFileStatus(srcPath); + Assert.assertEquals("File length should be the same", fileLength, status.getLen()); + } + + static void verifyPread(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, byte[] buf) throws IOException { + try (FSDataInputStream in = fs.open(srcPath)) { + int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102, + cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102, + cellSize * dataBlocks, fileLength - 102, fileLength - 1}; + for (int startOffset : startOffsets) { + startOffset = Math.max(0, Math.min(startOffset, fileLength - 1)); + int remaining = fileLength - startOffset; + int offset = startOffset; + final byte[] result = new byte[remaining]; + while (remaining > 0) { + int target = Math.min(remaining, buf.length); + in.readFully(offset, buf, 0, target); + System.arraycopy(buf, 0, result, offset - startOffset, target); + remaining -= target; + offset += target; + } + for (int i = 0; i < fileLength - startOffset; i++) { + Assert.assertEquals("Byte at " + (startOffset + i) + " is different, " + + "the startOffset is " + startOffset, + expected[startOffset + i], result[i]); + } + } + } + } + + static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, byte[] buf) throws IOException { + try (FSDataInputStream in = fs.open(srcPath)) { + final byte[] result = new byte[fileLength]; + int readLen = 0; + int ret; + while ((ret = in.read(buf, 0, buf.length)) >= 0) { + System.arraycopy(buf, 0, result, readLen, ret); + readLen += ret; + } + Assert.assertEquals("The length of file should be the same to write size", + fileLength, readLen); + Assert.assertArrayEquals(expected, result); + } + } + + static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, ByteBuffer buf) throws IOException { + try (FSDataInputStream in = fs.open(srcPath)) { + ByteBuffer result = ByteBuffer.allocate(fileLength); + int readLen = 0; + int ret; + while ((ret = in.read(buf)) >= 0) { + readLen += ret; + buf.flip(); + result.put(buf); + buf.clear(); + } + Assert.assertEquals("The length of file should be the same to write size", + fileLength, readLen); + Assert.assertArrayEquals(expected, result.array()); + } + } + + static void verifySeek(FileSystem fs, Path srcPath, int fileLength) + throws IOException { + try (FSDataInputStream in = fs.open(srcPath)) { + // seek to 1/2 of content + int pos = fileLength / 2; + assertSeekAndRead(in, pos, fileLength); + + // seek to 1/3 of content + pos = fileLength / 3; + assertSeekAndRead(in, pos, fileLength); + + // seek to 0 pos + pos = 0; + assertSeekAndRead(in, pos, fileLength); + + if (fileLength > cellSize) { + // seek to cellSize boundary + pos = cellSize - 1; + assertSeekAndRead(in, pos, fileLength); + } + + if (fileLength > cellSize * dataBlocks) { + // seek to striped cell group boundary + pos = cellSize * dataBlocks - 1; + assertSeekAndRead(in, pos, fileLength); + } + + if (fileLength > blockSize * dataBlocks) { + // seek to striped block group boundary + pos = blockSize * dataBlocks - 1; + assertSeekAndRead(in, pos, fileLength); + } + + if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) { + try { + in.seek(-1); + Assert.fail("Should be failed if seek to negative offset"); + } catch (EOFException e) { + // expected + } + + try { + in.seek(fileLength + 1); + Assert.fail("Should be failed if seek after EOF"); + } catch (EOFException e) { + // expected + } + } + } + } + + static void assertSeekAndRead(FSDataInputStream fsdis, int pos, + int writeBytes) throws IOException { + fsdis.seek(pos); + byte[] buf = new byte[writeBytes]; + int readLen = StripedFileTestUtil.readAll(fsdis, buf); + Assert.assertEquals(readLen, writeBytes - pos); + for (int i = 0; i < readLen; i++) { + Assert.assertEquals("Byte at " + i + " should be the same", + StripedFileTestUtil.getByte(pos + i), buf[i]); + } + } + + static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out, + final int dnIndex, final AtomicInteger pos) { + final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex); + final DatanodeInfo datanode = getDatanodes(s); + LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos); + cluster.stopDataNode(datanode.getXferAddr()); + } + + static DatanodeInfo getDatanodes(StripedDataStreamer streamer) { + for(;;) { + final DatanodeInfo[] datanodes = streamer.getNodes(); + if (datanodes != null) { + Assert.assertEquals(1, datanodes.length); + Assert.assertNotNull(datanodes[0]); + return datanodes[0]; + } + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + return null; + } + } + } + + /** + * Generate n random and different numbers within + * specified non-negative integer range + * @param min minimum of the range + * @param max maximum of the range + * @param n number to be generated + * @return + */ + public static int[] randomArray(int min, int max, int n){ + if (n > (max - min + 1) || max < min || min < 0 || max < 0) { + return null; + } + int[] result = new int[n]; + for (int i = 0; i < n; i++) { + result[i] = -1; + } + + int count = 0; + while(count < n) { + int num = (int) (Math.random() * (max - min)) + min; + boolean flag = true; + for (int j = 0; j < n; j++) { + if(num == result[j]){ + flag = false; + break; + } + } + if(flag){ + result[count] = num; + count++; + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java index d8aceff..1a767c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java @@ -250,8 +250,8 @@ public class TestBlockReaderFactory { LocatedBlock lblock = locatedBlocks.get(0); // first block BlockReader blockReader = null; try { - blockReader = BlockReaderTestUtil. - getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); + blockReader = BlockReaderTestUtil.getBlockReader( + cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); Assert.fail("expected getBlockReader to fail the first time."); } catch (Throwable t) { Assert.assertTrue("expected to see 'TCP reads were disabled " + @@ -265,8 +265,8 @@ public class TestBlockReaderFactory { // Second time should succeed. try { - blockReader = BlockReaderTestUtil. - getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); + blockReader = BlockReaderTestUtil.getBlockReader( + cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); } catch (Throwable t) { LOG.error("error trying to retrieve a block reader " + "the second time.", t); @@ -474,8 +474,8 @@ public class TestBlockReaderFactory { while (true) { BlockReader blockReader = null; try { - blockReader = BlockReaderTestUtil. - getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); + blockReader = BlockReaderTestUtil.getBlockReader( + cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); sem.release(); try { blockReader.readAll(buf, 0, TEST_FILE_LEN); @@ -514,8 +514,8 @@ public class TestBlockReaderFactory { // getting a ClosedChannelException. BlockReader blockReader = null; try { - blockReader = BlockReaderTestUtil. - getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); + blockReader = BlockReaderTestUtil.getBlockReader( + cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); blockReader.readFully(buf, 0, TEST_FILE_LEN); } finally { if (blockReader != null) blockReader.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 441ef9c..c68bd28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -257,12 +257,12 @@ public class TestDFSClientRetries { Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( (short) 777), "owner", "group", new byte[0], new byte[0], - 1010, 0, null, (byte) 0)).when(mockNN).getFileInfo(anyString()); + 1010, 0, null, (byte) 0, null, 0)).when(mockNN).getFileInfo(anyString()); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( (short) 777), "owner", "group", new byte[0], new byte[0], - 1010, 0, null, (byte) 0)) + 1010, 0, null, (byte) 0, null, 0)) .when(mockNN) .create(anyString(), (FsPermission) anyObject(), anyString(), (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(), @@ -550,7 +550,7 @@ public class TestDFSClientRetries { badBlocks.add(badLocatedBlock); return new LocatedBlocks(goodBlockList.getFileLength(), false, badBlocks, null, true, - null); + null, null, 0); } }
