diff --cc 
index 0000000,0000000..264c532
new file mode 100644
--- /dev/null
@@@ -1,0 -1,0 +1,952 @@@
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs.util;
++import org.apache.hadoop.classification.InterfaceAudience;
++import org.apache.hadoop.fs.StorageType;
++import org.apache.hadoop.hdfs.DFSClient;
++import org.apache.hadoop.hdfs.DFSStripedOutputStream;
++import org.apache.hadoop.hdfs.protocol.Block;
++import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
++import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
++import org.apache.hadoop.hdfs.protocol.LocatedBlock;
++import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
++import java.nio.ByteBuffer;
++import java.util.*;
++import java.util.concurrent.CancellationException;
++import java.util.concurrent.CompletionService;
++import java.util.concurrent.ExecutionException;
++import java.util.concurrent.Future;
++import java.util.concurrent.TimeUnit;
++ * When accessing a file in striped layout, operations on logical byte ranges
++ * in the file need to be mapped to physical byte ranges on block files stored
++ * on DataNodes. This utility class facilities this mapping by defining and
++ * exposing a number of striping-related concepts. The most basic ones are
++ * illustrated in the following diagram. Unless otherwise specified, all
++ * range-related calculations are inclusive (the end offset of the previous
++ * range should be 1 byte lower than the start offset of the next one).
++ *
++ *  | <----  Block Group ----> |   <- Block Group: logical unit composing
++ *  |                          |        striped HDFS files.
++ *  blk_0      blk_1       blk_2   <- Internal Blocks: each internal block
++ *    |          |           |          represents a physically stored local
++ *    v          v           v          block file
++ * +------+   +------+   +------+
++ * |cell_0|   |cell_1|   |cell_2|  <- {@link StripingCell} represents the
++ * +------+   +------+   +------+       logical order that a Block Group 
++ * |cell_3|   |cell_4|   |cell_5|       be accessed: cell_0, cell_1, ...
++ * +------+   +------+   +------+
++ * |cell_6|   |cell_7|   |cell_8|
++ * +------+   +------+   +------+
++ * |cell_9|
++ * +------+  <- A cell contains cellSize bytes of data
++ */
++public class StripedBlockUtil {
++  /**
++   * This method parses a striped block group into individual blocks.
++   *
++   * @param bg The striped block group
++   * @param cellSize The size of a striping cell
++   * @param dataBlkNum The number of data blocks
++   * @return An array containing the blocks in the group
++   */
++  public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
++      int cellSize, int dataBlkNum, int parityBlkNum) {
++    int locatedBGSize = bg.getBlockIndices().length;
++    LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum];
++    for (short i = 0; i < locatedBGSize; i++) {
++      final int idx = bg.getBlockIndices()[i];
++      // for now we do not use redundant replica of an internal block
++      if (idx < (dataBlkNum + parityBlkNum) && lbs[idx] == null) {
++        lbs[idx] = constructInternalBlock(bg, i, cellSize,
++            dataBlkNum, idx);
++      }
++    }
++    return lbs;
++  }
++  /**
++   * This method creates an internal block at the given index of a block group
++   *
++   * @param idxInReturnedLocs The index in the stored locations in the
++   *                          {@link LocatedStripedBlock} object
++   * @param idxInBlockGroup The logical index in the striped block group
++   * @return The constructed internal block
++   */
++  public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
++      int idxInReturnedLocs, int cellSize, int dataBlkNum,
++      int idxInBlockGroup) {
++    final ExtendedBlock blk = constructInternalBlock(
++        bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
++    final LocatedBlock locatedBlock;
++    if (idxInReturnedLocs < bg.getLocations().length) {
++      locatedBlock = new LocatedBlock(blk,
++          new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
++          new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
++          new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
++          bg.getStartOffset(), bg.isCorrupt(), null);
++    } else {
++      locatedBlock = new LocatedBlock(blk, null, null, null,
++          bg.getStartOffset(), bg.isCorrupt(), null);
++    }
++    Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens();
++    if (idxInReturnedLocs < blockTokens.length) {
++      locatedBlock.setBlockToken(blockTokens[idxInReturnedLocs]);
++    }
++    return locatedBlock;
++  }
++  /**
++   * This method creates an internal {@link ExtendedBlock} at the given index
++   * of a block group.
++   */
++  public static ExtendedBlock constructInternalBlock(ExtendedBlock blockGroup,
++      int cellSize, int dataBlkNum, int idxInBlockGroup) {
++    ExtendedBlock block = new ExtendedBlock(blockGroup);
++    block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup);
++    block.setNumBytes(getInternalBlockLength(blockGroup.getNumBytes(),
++        cellSize, dataBlkNum, idxInBlockGroup));
++    return block;
++  }
++  /**
++   * Get the size of an internal block at the given index of a block group
++   *
++   * @param dataSize Size of the block group only counting data blocks
++   * @param cellSize The size of a striping cell
++   * @param numDataBlocks The number of data blocks
++   * @param i The logical index in the striped block group
++   * @return The size of the internal block at the specified index
++   */
++  public static long getInternalBlockLength(long dataSize,
++      int cellSize, int numDataBlocks, int i) {
++    Preconditions.checkArgument(dataSize >= 0);
++    Preconditions.checkArgument(cellSize > 0);
++    Preconditions.checkArgument(numDataBlocks > 0);
++    Preconditions.checkArgument(i >= 0);
++    // Size of each stripe (only counting data blocks)
++    final int stripeSize = cellSize * numDataBlocks;
++    // If block group ends at stripe boundary, each internal block has an 
++    // share of the group
++    final int lastStripeDataLen = (int)(dataSize % stripeSize);
++    if (lastStripeDataLen == 0) {
++      return dataSize / numDataBlocks;
++    }
++    final int numStripes = (int) ((dataSize - 1) / stripeSize + 1);
++    return (numStripes - 1L)*cellSize
++        + lastCellSize(lastStripeDataLen, cellSize, numDataBlocks, i);
++  }
++  private static int lastCellSize(int size, int cellSize, int numDataBlocks,
++      int i) {
++    if (i < numDataBlocks) {
++      // parity block size (i.e. i >= numDataBlocks) is the same as 
++      // the first data block size (i.e. i = 0).
++      size -= i*cellSize;
++      if (size < 0) {
++        size = 0;
++      }
++    }
++    return size > cellSize? cellSize: size;
++  }
++  /**
++   * Given a byte's offset in an internal block, calculate the offset in
++   * the block group
++   */
++  public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum,
++      long offsetInBlk, int idxInBlockGroup) {
++    int cellIdxInBlk = (int) (offsetInBlk / cellSize);
++    return cellIdxInBlk * cellSize * dataBlkNum // n full stripes before 
++        + idxInBlockGroup * cellSize // m full cells before offset
++        + offsetInBlk % cellSize; // partial cell
++  }
++  /**
++   * Get the next completed striped read task
++   *
++   * @return {@link StripingChunkReadResult} indicating the status of the 
read task
++   *          succeeded, and the block index of the task. If the method times
++   *          out without getting any completed read tasks, -1 is returned as
++   *          block index.
++   * @throws InterruptedException
++   */
++  public static StripingChunkReadResult getNextCompletedStripedRead(
++      CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
++      final long timeoutMillis) throws InterruptedException {
++    Preconditions.checkArgument(!futures.isEmpty());
++    Future<Void> future = null;
++    try {
++      if (timeoutMillis > 0) {
++        future = readService.poll(timeoutMillis, TimeUnit.MILLISECONDS);
++      } else {
++        future = readService.take();
++      }
++      if (future != null) {
++        future.get();
++        return new StripingChunkReadResult(futures.remove(future),
++            StripingChunkReadResult.SUCCESSFUL);
++      } else {
++        return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
++      }
++    } catch (ExecutionException e) {
++      if (DFSClient.LOG.isDebugEnabled()) {
++        DFSClient.LOG.debug("ExecutionException " + e);
++      }
++      return new StripingChunkReadResult(futures.remove(future),
++          StripingChunkReadResult.FAILED);
++    } catch (CancellationException e) {
++      return new StripingChunkReadResult(futures.remove(future),
++          StripingChunkReadResult.CANCELLED);
++    }
++  }
++  /**
++   * Get the total usage of the striped blocks, which is the total of data
++   * blocks and parity blocks
++   *
++   * @param numDataBlkBytes
++   *          Size of the block group only counting data blocks
++   * @param dataBlkNum
++   *          The number of data blocks
++   * @param parityBlkNum
++   *          The number of parity blocks
++   * @param cellSize
++   *          The size of a striping cell
++   * @return The total usage of data blocks and parity blocks
++   */
++  public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
++      int dataBlkNum, int parityBlkNum, int cellSize) {
++    int parityIndex = dataBlkNum + 1;
++    long numParityBlkBytes = getInternalBlockLength(numDataBlkBytes, cellSize,
++        dataBlkNum, parityIndex) * parityBlkNum;
++    return numDataBlkBytes + numParityBlkBytes;
++  }
++  /**
++   * Initialize the decoding input buffers based on the chunk states in an
++   * {@link AlignedStripe}. For each chunk that was not initially requested,
++   * schedule a new fetch request with the decoding input buffer as transfer
++   * destination.
++   */
++  public static byte[][] initDecodeInputs(AlignedStripe alignedStripe,
++      int dataBlkNum, int parityBlkNum) {
++    byte[][] decodeInputs =
++        new byte[dataBlkNum + parityBlkNum][(int) 
++    // read the full data aligned stripe
++    for (int i = 0; i < dataBlkNum; i++) {
++      if (alignedStripe.chunks[i] == null) {
++        final int decodeIndex = convertIndex4Decode(i, dataBlkNum, 
++        alignedStripe.chunks[i] = new 
++        alignedStripe.chunks[i].addByteArraySlice(0,
++            (int) alignedStripe.getSpanInBlock());
++      }
++    }
++    return decodeInputs;
++  }
++  /**
++   * Some fetched {@link StripingChunk} might be stored in original 
++   * buffer instead of prepared decode input buffers. Some others are beyond
++   * the range of the internal blocks and should correspond to all zero bytes.
++   * When all pending requests have returned, this method should be called to
++   * finalize decode input buffers.
++   */
++  public static void finalizeDecodeInputs(final byte[][] decodeInputs,
++      int dataBlkNum, int parityBlkNum, AlignedStripe alignedStripe) {
++    for (int i = 0; i < alignedStripe.chunks.length; i++) {
++      final StripingChunk chunk = alignedStripe.chunks[i];
++      final int decodeIndex = convertIndex4Decode(i, dataBlkNum, 
++      if (chunk != null && chunk.state == StripingChunk.FETCHED) {
++        chunk.copyTo(decodeInputs[decodeIndex]);
++      } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
++        Arrays.fill(decodeInputs[decodeIndex], (byte) 0);
++      } else {
++        decodeInputs[decodeIndex] = null;
++      }
++    }
++  }
++  /**
++   * Currently decoding requires parity chunks are before data chunks.
++   * The indices are opposite to what we store in NN. In future we may
++   * improve the decoding to make the indices order the same as in NN.
++   *
++   * @param index The index to convert
++   * @param dataBlkNum The number of data blocks
++   * @param parityBlkNum The number of parity blocks
++   * @return converted index
++   */
++  public static int convertIndex4Decode(int index, int dataBlkNum,
++      int parityBlkNum) {
++    return index < dataBlkNum ? index + parityBlkNum : index - dataBlkNum;
++  }
++  public static int convertDecodeIndexBack(int index, int dataBlkNum,
++      int parityBlkNum) {
++    return index < parityBlkNum ? index + dataBlkNum : index - parityBlkNum;
++  }
++  /**
++   * Decode based on the given input buffers and erasure coding policy.
++   */
++  public static void decodeAndFillBuffer(final byte[][] decodeInputs,
++      AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum,
++      RawErasureDecoder decoder) {
++    // Step 1: prepare indices and output buffers for missing data units
++    int[] decodeIndices = new int[parityBlkNum];
++    int pos = 0;
++    for (int i = 0; i < dataBlkNum; i++) {
++      if (alignedStripe.chunks[i] != null &&
++          alignedStripe.chunks[i].state == StripingChunk.MISSING){
++        decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, 
++      }
++    }
++    decodeIndices = Arrays.copyOf(decodeIndices, pos);
++    byte[][] decodeOutputs =
++        new byte[decodeIndices.length][(int) alignedStripe.getSpanInBlock()];
++    // Step 2: decode into prepared output buffers
++    decoder.decode(decodeInputs, decodeIndices, decodeOutputs);
++    // Step 3: fill original application buffer with decoded data
++    for (int i = 0; i < decodeIndices.length; i++) {
++      int missingBlkIdx = convertDecodeIndexBack(decodeIndices[i],
++          dataBlkNum, parityBlkNum);
++      StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
++      if (chunk.state == StripingChunk.MISSING) {
++        chunk.copyFrom(decodeOutputs[i]);
++      }
++    }
++  }
++  /**
++   * Similar functionality with {@link #divideByteRangeIntoStripes}, but is 
++   * by stateful read and uses ByteBuffer as reading target buffer. Besides 
++   * read range is within a single stripe thus the calculation logic is 
++   */
++  public static AlignedStripe[] divideOneStripe(ErasureCodingPolicy ecPolicy,
++      int cellSize, LocatedStripedBlock blockGroup, long 
++      long rangeEndInBlockGroup, ByteBuffer buf) {
++    final int dataBlkNum = ecPolicy.getNumDataUnits();
++    // Step 1: map the byte range to StripingCells
++    StripingCell[] cells = getStripingCellsOfByteRange(ecPolicy, cellSize,
++        blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup);
++    // Step 2: get the unmerged ranges on each internal block
++    VerticalRange[] ranges = getRangesForInternalBlocks(ecPolicy, cellSize,
++        cells);
++    // Step 3: merge into stripes
++    AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
++    // Step 4: calculate each chunk's position in destination buffer. Since 
++    // whole read range is within a single stripe, the logic is simpler here.
++    int bufOffset = (int) (rangeStartInBlockGroup % ((long) cellSize * 
++    for (StripingCell cell : cells) {
++      long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
++      long cellEnd = cellStart + cell.size - 1;
++      for (AlignedStripe s : stripes) {
++        long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
++        long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
++        long overlapEnd = Math.min(cellEnd, stripeEnd);
++        int overLapLen = (int) (overlapEnd - overlapStart + 1);
++        if (overLapLen > 0) {
++          Preconditions.checkState(s.chunks[cell.idxInStripe] == null);
++          final int pos = (int) (bufOffset + overlapStart - cellStart);
++          buf.position(pos);
++          buf.limit(pos + overLapLen);
++          s.chunks[cell.idxInStripe] = new StripingChunk(buf.slice());
++        }
++      }
++      bufOffset += cell.size;
++    }
++    // Step 5: prepare ALLZERO blocks
++    prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum);
++    return stripes;
++  }
++  /**
++   * This method divides a requested byte range into an array of inclusive
++   * {@link AlignedStripe}.
++   * @param ecPolicy The codec policy for the file, which carries the numbers
++   *                 of data / parity blocks
++   * @param cellSize Cell size of stripe
++   * @param blockGroup The striped block group
++   * @param rangeStartInBlockGroup The byte range's start offset in block 
++   * @param rangeEndInBlockGroup The byte range's end offset in block group
++   * @param buf Destination buffer of the read operation for the byte range
++   * @param offsetInBuf Start offset into the destination buffer
++   *
++   * At most 5 stripes will be generated from each logical range, as
++   * demonstrated in the header of {@link AlignedStripe}.
++   */
++  public static AlignedStripe[] 
divideByteRangeIntoStripes(ErasureCodingPolicy ecPolicy,
++      int cellSize, LocatedStripedBlock blockGroup,
++      long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf,
++      int offsetInBuf) {
++    // Step 0: analyze range and calculate basic parameters
++    final int dataBlkNum = ecPolicy.getNumDataUnits();
++    // Step 1: map the byte range to StripingCells
++    StripingCell[] cells = getStripingCellsOfByteRange(ecPolicy, cellSize,
++        blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup);
++    // Step 2: get the unmerged ranges on each internal block
++    VerticalRange[] ranges = getRangesForInternalBlocks(ecPolicy, cellSize,
++        cells);
++    // Step 3: merge into at most 5 stripes
++    AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
++    // Step 4: calculate each chunk's position in destination buffer
++    calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf);
++    // Step 5: prepare ALLZERO blocks
++    prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum);
++    return stripes;
++  }
++  /**
++   * Map the logical byte range to a set of inclusive {@link StripingCell}
++   * instances, each representing the overlap of the byte range to a cell
++   * used by {@link DFSStripedOutputStream} in encoding
++   */
++  @VisibleForTesting
++  private static StripingCell[] 
getStripingCellsOfByteRange(ErasureCodingPolicy ecPolicy,
++      int cellSize, LocatedStripedBlock blockGroup,
++      long rangeStartInBlockGroup, long rangeEndInBlockGroup) {
++    Preconditions.checkArgument(
++        rangeStartInBlockGroup <= rangeEndInBlockGroup &&
++            rangeEndInBlockGroup < blockGroup.getBlockSize());
++    long len = rangeEndInBlockGroup - rangeStartInBlockGroup + 1;
++    int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
++    int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize);
++    int numCells = lastCellIdxInBG - firstCellIdxInBG + 1;
++    StripingCell[] cells = new StripingCell[numCells];
++    final int firstCellOffset = (int) (rangeStartInBlockGroup % cellSize);
++    final int firstCellSize =
++        (int) Math.min(cellSize - (rangeStartInBlockGroup % cellSize), len);
++    cells[0] = new StripingCell(ecPolicy, firstCellSize, firstCellIdxInBG,
++        firstCellOffset);
++    if (lastCellIdxInBG != firstCellIdxInBG) {
++      final int lastCellSize = (int) (rangeEndInBlockGroup % cellSize) + 1;
++      cells[numCells - 1] = new StripingCell(ecPolicy, lastCellSize,
++          lastCellIdxInBG, 0);
++    }
++    for (int i = 1; i < numCells - 1; i++) {
++      cells[i] = new StripingCell(ecPolicy, cellSize, i + firstCellIdxInBG, 
++    }
++    return cells;
++  }
++  /**
++   * Given a logical byte range, mapped to each {@link StripingCell}, 
++   * the physical byte range (inclusive) on each stored internal block.
++   */
++  @VisibleForTesting
++  private static VerticalRange[] 
getRangesForInternalBlocks(ErasureCodingPolicy ecPolicy,
++      int cellSize, StripingCell[] cells) {
++    int dataBlkNum = ecPolicy.getNumDataUnits();
++    int parityBlkNum = ecPolicy.getNumParityUnits();
++    VerticalRange ranges[] = new VerticalRange[dataBlkNum + parityBlkNum];
++    long earliestStart = Long.MAX_VALUE;
++    long latestEnd = -1;
++    for (StripingCell cell : cells) {
++      // iterate through all cells and update the list of StripeRanges
++      if (ranges[cell.idxInStripe] == null) {
++        ranges[cell.idxInStripe] = new VerticalRange(
++            cell.idxInInternalBlk * cellSize + cell.offset, cell.size);
++      } else {
++        ranges[cell.idxInStripe].spanInBlock += cell.size;
++      }
++      VerticalRange range = ranges[cell.idxInStripe];
++      if (range.offsetInBlock < earliestStart) {
++        earliestStart = range.offsetInBlock;
++      }
++      if (range.offsetInBlock + range.spanInBlock - 1 > latestEnd) {
++        latestEnd = range.offsetInBlock + range.spanInBlock - 1;
++      }
++    }
++    // Each parity block should be fetched at maximum range of all data blocks
++    for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
++      ranges[i] = new VerticalRange(earliestStart,
++          latestEnd - earliestStart + 1);
++    }
++    return ranges;
++  }
++  /**
++   * Merge byte ranges on each internal block into a set of inclusive
++   * {@link AlignedStripe} instances.
++   */
++  private static AlignedStripe[] mergeRangesForInternalBlocks(
++      ErasureCodingPolicy ecPolicy, VerticalRange[] ranges) {
++    int dataBlkNum = ecPolicy.getNumDataUnits();
++    int parityBlkNum = ecPolicy.getNumParityUnits();
++    List<AlignedStripe> stripes = new ArrayList<>();
++    SortedSet<Long> stripePoints = new TreeSet<>();
++    for (VerticalRange r : ranges) {
++      if (r != null) {
++        stripePoints.add(r.offsetInBlock);
++        stripePoints.add(r.offsetInBlock + r.spanInBlock);
++      }
++    }
++    long prev = -1;
++    for (long point : stripePoints) {
++      if (prev >= 0) {
++        stripes.add(new AlignedStripe(prev, point - prev,
++            dataBlkNum + parityBlkNum));
++      }
++      prev = point;
++    }
++    return stripes.toArray(new AlignedStripe[stripes.size()]);
++  }
++  private static void calcualteChunkPositionsInBuf(int cellSize,
++      AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
++      int offsetInBuf) {
++    /**
++     *     | <--------------- AlignedStripe --------------->|
++     *
++     *     |<- length_0 ->|<--  length_1  -->|<- length_2 ->|
++     * +------------------+------------------+----------------+
++     * |    cell_0_0_0    |    cell_3_1_0    |   cell_6_2_0   |  <- blk_0
++     * +------------------+------------------+----------------+
++     *   _/                \_______________________
++     *  |                                          |
++     *  v offset_0                                 v offset_1
++     * +----------------------------------------------------------+
++     * |  cell_0_0_0 |  cell_1_0_1 and cell_2_0_2  |cell_3_1_0 ...|   <- buf
++     * |  (partial)  |    (from blk_1 and blk_2)   |              |
++     * +----------------------------------------------------------+
++     *
++     * Cell indexing convention defined in {@link StripingCell}
++     */
++    int done = 0;
++    for (StripingCell cell : cells) {
++      long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
++      long cellEnd = cellStart + cell.size - 1;
++      for (AlignedStripe s : stripes) {
++        long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
++        long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
++        long overlapEnd = Math.min(cellEnd, stripeEnd);
++        int overLapLen = (int) (overlapEnd - overlapStart + 1);
++        if (overLapLen <= 0) {
++          continue;
++        }
++        if (s.chunks[cell.idxInStripe] == null) {
++          s.chunks[cell.idxInStripe] = new StripingChunk(buf);
++        }
++        s.chunks[cell.idxInStripe].addByteArraySlice(
++            (int)(offsetInBuf + done + overlapStart - cellStart), overLapLen);
++      }
++      done += cell.size;
++    }
++  }
++  /**
++   * If a {@link StripingChunk} maps to a byte range beyond an internal 
++   * size, the chunk should be treated as zero bytes in decoding.
++   */
++  private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup,
++      AlignedStripe[] stripes, int cellSize, int dataBlkNum) {
++    for (AlignedStripe s : stripes) {
++      for (int i = 0; i < dataBlkNum; i++) {
++        long internalBlkLen = 
++            cellSize, dataBlkNum, i);
++        if (internalBlkLen <= s.getOffsetInBlock()) {
++          Preconditions.checkState(s.chunks[i] == null);
++          s.chunks[i] = new StripingChunk(StripingChunk.ALLZERO);
++        }
++      }
++    }
++  }
++  /**
++   * Cell is the unit of encoding used in {@link DFSStripedOutputStream}. This
++   * size impacts how a logical offset in the file or block group translates
++   * to physical byte offset in a stored internal block. The StripingCell util
++   * class facilitates this calculation. Each StripingCell is inclusive with
++   * its start and end offsets -- e.g., the end logical offset of cell_0_0_0
++   * should be 1 byte lower than the start logical offset of cell_1_0_1.
++   *
++   *  | <------- Striped Block Group -------> |
++   *    blk_0          blk_1          blk_2
++   *      |              |              |
++   *      v              v              v
++   * +----------+   +----------+   +----------+
++   * |cell_0_0_0|   |cell_1_0_1|   |cell_2_0_2|
++   * +----------+   +----------+   +----------+
++   * |cell_3_1_0|   |cell_4_1_1|   |cell_5_1_2| <- {@link #idxInBlkGroup} = 5
++   * +----------+   +----------+   +----------+    {@link #idxInInternalBlk} 
= 1
++   *                                               {@link #idxInStripe} = 2
++   * A StripingCell is a special instance of {@link StripingChunk} whose 
++   * and size align with the cell used when writing data.
++   * TODO: consider parity cells
++   */
++  @VisibleForTesting
++  static class StripingCell {
++    final ErasureCodingPolicy ecPolicy;
++    /** Logical order in a block group, used when doing I/O to a block group 
++    final int idxInBlkGroup;
++    final int idxInInternalBlk;
++    final int idxInStripe;
++    /**
++     * When a logical byte range is mapped to a set of cells, it might
++     * partially overlap with the first and last cells. This field and the
++     * {@link #size} variable represent the start offset and size of the
++     * overlap.
++     */
++    final int offset;
++    final int size;
++    StripingCell(ErasureCodingPolicy ecPolicy, int cellSize, int 
++        int offset) {
++      this.ecPolicy = ecPolicy;
++      this.idxInBlkGroup = idxInBlkGroup;
++      this.idxInInternalBlk = idxInBlkGroup / ecPolicy.getNumDataUnits();
++      this.idxInStripe = idxInBlkGroup -
++          this.idxInInternalBlk * ecPolicy.getNumDataUnits();
++      this.offset = offset;
++      this.size = cellSize;
++    }
++  }
++  /**
++   * Given a requested byte range on a striped block group, an AlignedStripe
++   * represents an inclusive {@link VerticalRange} that is aligned with both
++   * the byte range and boundaries of all internal blocks. As illustrated in
++   * the diagram, any given byte range on a block group leads to 1~5
++   * AlignedStripe's.
++   *
++   * |<-------- Striped Block Group -------->|
++   * blk_0   blk_1   blk_2      blk_3   blk_4
++   *                 +----+  |  +----+  +----+
++   *                 |full|  |  |    |  |    | <- AlignedStripe0:
++   *         +----+  |~~~~|  |  |~~~~|  |~~~~|      1st cell is partial
++   *         |part|  |    |  |  |    |  |    | <- AlignedStripe1: byte range
++   * +----+  +----+  +----+  |  |~~~~|  |~~~~|      doesn't start at 1st block
++   * |full|  |full|  |full|  |  |    |  |    |
++   * |cell|  |cell|  |cell|  |  |    |  |    | <- AlignedStripe2 (full stripe)
++   * |    |  |    |  |    |  |  |    |  |    |
++   * +----+  +----+  +----+  |  |~~~~|  |~~~~|
++   * |full|  |part|          |  |    |  |    | <- AlignedStripe3: byte range
++   * |~~~~|  +----+          |  |~~~~|  |~~~~|      doesn't end at last block
++   * |    |                  |  |    |  |    | <- AlignedStripe4:
++   * +----+                  |  +----+  +----+      last cell is partial
++   *                         |
++   * <---- data blocks ----> | <--- parity --->
++   *
++   * An AlignedStripe is the basic unit of reading from a striped block group,
++   * because within the AlignedStripe, all internal blocks can be processed in
++   * a uniform manner.
++   *
++   * The coverage of an AlignedStripe on an internal block is represented as a
++   * {@link StripingChunk}.
++   *
++   * To simplify the logic of reading a logical byte range from a block group,
++   * a StripingChunk is either completely in the requested byte range or
++   * completely outside the requested byte range.
++   */
++  public static class AlignedStripe {
++    public VerticalRange range;
++    /** status of each chunk in the stripe */
++    public final StripingChunk[] chunks;
++    public int fetchedChunksNum = 0;
++    public int missingChunksNum = 0;
++    public AlignedStripe(long offsetInBlock, long length, int width) {
++      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
++      this.range = new VerticalRange(offsetInBlock, length);
++      this.chunks = new StripingChunk[width];
++    }
++    public boolean include(long pos) {
++      return range.include(pos);
++    }
++    public long getOffsetInBlock() {
++      return range.offsetInBlock;
++    }
++    public long getSpanInBlock() {
++      return range.spanInBlock;
++    }
++    @Override
++    public String toString() {
++      return "Offset=" + range.offsetInBlock + ", length=" + 
range.spanInBlock +
++          ", fetchedChunksNum=" + fetchedChunksNum +
++          ", missingChunksNum=" + missingChunksNum;
++    }
++  }
++  /**
++   * A simple utility class representing an arbitrary vertical inclusive range
++   * starting at {@link #offsetInBlock} and lasting for {@link #spanInBlock}
++   * bytes in an internal block. Note that VerticalRange doesn't necessarily
++   * align with {@link StripingCell}.
++   *
++   * |<- Striped Block Group ->|
++   *  blk_0
++   *    |
++   *    v
++   * +-----+
++   * |~~~~~| <-- {@link #offsetInBlock}
++   * |     |  ^
++   * |     |  |
++   * |     |  | {@link #spanInBlock}
++   * |     |  v
++   * |~~~~~| ---
++   * |     |
++   * +-----+
++   */
++  public static class VerticalRange {
++    /** start offset in the block group (inclusive) */
++    public long offsetInBlock;
++    /** length of the stripe range */
++    public long spanInBlock;
++    public VerticalRange(long offsetInBlock, long length) {
++      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
++      this.offsetInBlock = offsetInBlock;
++      this.spanInBlock = length;
++    }
++    /** whether a position is in the range */
++    public boolean include(long pos) {
++      return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock;
++    }
++  }
++  /**
++   * Indicates the coverage of an {@link AlignedStripe} on an internal block,
++   * and the state of the chunk in the context of the read request.
++   *
++   * |<---------------- Striped Block Group --------------->|
++   *   blk_0        blk_1        blk_2          blk_3   blk_4
++   *                           +---------+  |  +----+  +----+
++   *     null         null     |REQUESTED|  |  |null|  |null| <- 
++   *              +---------+  |---------|  |  |----|  |----|
++   *     null     |REQUESTED|  |REQUESTED|  |  |null|  |null| <- 
++   * +---------+  +---------+  +---------+  |  +----+  +----+
++   * |REQUESTED|  |REQUESTED|    ALLZERO    |  |null|  |null| <- 
++   * +---------+  +---------+               |  +----+  +----+
++   * <----------- data blocks ------------> | <--- parity --->
++   */
++  public static class StripingChunk {
++    /** Chunk has been successfully fetched */
++    public static final int FETCHED = 0x01;
++    /** Chunk has encountered failed when being fetched */
++    public static final int MISSING = 0x02;
++    /** Chunk being fetched (fetching task is in-flight) */
++    public static final int PENDING = 0x04;
++    /**
++     * Chunk is requested either by application or for decoding, need to
++     * schedule read task
++     */
++    public static final int REQUESTED = 0X08;
++    /**
++     * Internal block is short and has no overlap with chunk. Chunk considered
++     * all-zero bytes in codec calculations.
++     */
++    public static final int ALLZERO = 0X0f;
++    /**
++     * If a chunk is completely in requested range, the state transition is:
++     * REQUESTED (when AlignedStripe created) -> PENDING -> {FETCHED | 
++     * If a chunk is completely outside requested range (including parity
++     * chunks), state transition is:
++     * null (AlignedStripe created) -> REQUESTED (upon failure) -> PENDING ...
++     */
++    public int state = REQUESTED;
++    public final ChunkByteArray byteArray;
++    public final ByteBuffer byteBuffer;
++    public StripingChunk(byte[] buf) {
++      this.byteArray = new ChunkByteArray(buf);
++      byteBuffer = null;
++    }
++    public StripingChunk(ByteBuffer buf) {
++      this.byteArray = null;
++      this.byteBuffer = buf;
++    }
++    public StripingChunk(int state) {
++      this.byteArray = null;
++      this.byteBuffer = null;
++      this.state = state;
++    }
++    public void addByteArraySlice(int offset, int length) {
++      assert byteArray != null;
++      byteArray.offsetsInBuf.add(offset);
++      byteArray.lengthsInBuf.add(length);
++    }
++    void copyTo(byte[] target) {
++      assert byteArray != null;
++      byteArray.copyTo(target);
++    }
++    void copyFrom(byte[] src) {
++      assert byteArray != null;
++      byteArray.copyFrom(src);
++    }
++  }
++  public static class ChunkByteArray {
++    private final byte[] buf;
++    private final List<Integer> offsetsInBuf;
++    private final List<Integer> lengthsInBuf;
++    ChunkByteArray(byte[] buf) {
++      this.buf = buf;
++      this.offsetsInBuf = new ArrayList<>();
++      this.lengthsInBuf = new ArrayList<>();
++    }
++    public int[] getOffsets() {
++      int[] offsets = new int[offsetsInBuf.size()];
++      for (int i = 0; i < offsets.length; i++) {
++        offsets[i] = offsetsInBuf.get(i);
++      }
++      return offsets;
++    }
++    public int[] getLengths() {
++      int[] lens = new int[this.lengthsInBuf.size()];
++      for (int i = 0; i < lens.length; i++) {
++        lens[i] = this.lengthsInBuf.get(i);
++      }
++      return lens;
++    }
++    public byte[] buf() {
++      return buf;
++    }
++    void copyTo(byte[] target) {
++      int posInBuf = 0;
++      for (int i = 0; i < offsetsInBuf.size(); i++) {
++        System.arraycopy(buf, offsetsInBuf.get(i),
++            target, posInBuf, lengthsInBuf.get(i));
++        posInBuf += lengthsInBuf.get(i);
++      }
++    }
++    void copyFrom(byte[] src) {
++      int srcPos = 0;
++      for (int j = 0; j < offsetsInBuf.size(); j++) {
++        System.arraycopy(src, srcPos, buf, offsetsInBuf.get(j),
++            lengthsInBuf.get(j));
++        srcPos += lengthsInBuf.get(j);
++      }
++    }
++  }
++  /**
++   * This class represents result from a striped read request.
++   * If the task was successful or the internal computation failed,
++   * an index is also returned.
++   */
++  public static class StripingChunkReadResult {
++    public static final int SUCCESSFUL = 0x01;
++    public static final int FAILED = 0x02;
++    public static final int TIMEOUT = 0x04;
++    public static final int CANCELLED = 0x08;
++    public final int index;
++    public final int state;
++    public StripingChunkReadResult(int state) {
++      Preconditions.checkArgument(state == TIMEOUT,
++          "Only timeout result should return negative index.");
++      this.index = -1;
++      this.state = state;
++    }
++    public StripingChunkReadResult(int index, int state) {
++      Preconditions.checkArgument(state != TIMEOUT,
++          "Timeout result should return negative index.");
++      this.index = index;
++      this.state = state;
++    }
++    @Override
++    public String toString() {
++      return "(index=" + index + ", state =" + state + ")";
++    }
++  }
++  /**
++   * Check if the information such as IDs and generation stamps in block-i
++   * match the block group.
++   */
++  public static void checkBlocks(ExtendedBlock blockGroup,
++      int i, ExtendedBlock blocki) throws IOException {
++    if (!blocki.getBlockPoolId().equals(blockGroup.getBlockPoolId())) {
++      throw new IOException("Block pool IDs mismatched: block" + i + "="
++          + blocki + ", expected block group=" + blockGroup);
++    }
++    if (blocki.getBlockId() - i != blockGroup.getBlockId()) {
++      throw new IOException("Block IDs mismatched: block" + i + "="
++          + blocki + ", expected block group=" + blockGroup);
++    }
++    if (blocki.getGenerationStamp() != blockGroup.getGenerationStamp()) {
++      throw new IOException("Generation stamps mismatched: block" + i + "="
++          + blocki + ", expected block group=" + blockGroup);
++    }
++  }
++  public static int getBlockIndex(Block reportedBlock) {
++    long BLOCK_GROUP_INDEX_MASK = 15;
++    return (int) (reportedBlock.getBlockId() &
++  }
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index b28ab42,0e2d541..d35fb57
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@@ -648,3 -444,3 +478,11 @@@ message RollingUpgradeStatusProto 
    required string blockPoolId = 1;
    optional bool finalized = 2 [default = false];
++ * A list of storage IDs.
++ */
++message StorageUuidsProto {
++  repeated string storageUuids = 1;
diff --cc 
index 8874c4d,b631955..0166029
@@@ -402,14 -397,11 +400,19 @@@ public class DFSConfigKeys extends Comm
    public static final int     DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 
    public static final String  DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = 
    public static final int     DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
 +  public static final String  DFS_DATANODE_STRIPED_READ_THREADS_KEY = 
 +  public static final int     DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20;
 +  public static final String  DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = 
 +  public static final int     DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 
64 * 1024;
 +  public static final String  DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY = 
 +  public static final int     
 +  public static final String  DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY = 
 +  public static final int     
+   public static final String
+       "";
+   public static final int
    public static final String  DFS_DATANODE_DNS_INTERFACE_KEY = 
    public static final String  DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
    public static final String  DFS_DATANODE_DNS_NAMESERVER_KEY = 
diff --cc 
index 7f721f0,5b11ac2..b0ea7ce
@@@ -1442,34 -1439,5 +1441,4 @@@ public class DFSUtil 
      return cryptoProvider;
-   public static int getIoFileBufferSize(Configuration conf) {
-     return conf.getInt(
-       CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
-       CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-   }
-   public static int getSmallBufferSize(Configuration conf) {
-     return Math.min(getIoFileBufferSize(conf) / 2, 512);
-   }
-   /**
-    * Probe for HDFS Encryption being enabled; this uses the value of
-    * the option {@link DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI},
-    * returning true if that property contains a non-empty, non-whitespace
-    * string.
-    * @param conf configuration to probe
-    * @return true if encryption is considered enabled.
-    */
-   public static boolean isHDFSEncryptionEnabled(Configuration conf) {
-     return !conf.getTrimmed(
-         DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
-   }
-   public static InterruptedIOException toInterruptedIOException(String 
-       InterruptedException e) {
-     final InterruptedIOException iioe = new InterruptedIOException(message);
-     iioe.initCause(e);
-     return iioe;
-   }
diff --cc 
index 524248c,75b3811..05c498f
@@@ -47,36 -45,27 +47,35 @@@ import org.apache.hadoop.hdfs.protocol.
 +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
- import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
- import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
- import 
- import 
- import 
- import 
++import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto;
- import 
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
- import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
- import 
- import 
- import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto;
- import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RecoveringBlockProto;
- import 
- import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
- import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto;
- import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
- import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
+ import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlockKeyProto;
+ import 
+ import 
+ import 
+ import 
+ import 
+ import 
+ import 
+ import 
+ import 
+ import 
+ import 
+ import 
+ import 
+ import 
diff --cc 
index 9228bec,2646089..43ddf74
@@@ -364,10 -368,9 +370,9 @@@ public class DataNode extends Reconfigu
    private String supergroup;
    private boolean isPermissionEnabled;
    private String dnUserName = null;
-   private SpanReceiverHost spanReceiverHost;
 +  private ErasureCodingWorker ecWorker;
+   final Tracer tracer;
+   private final TracerConfigurationManager tracerConfigurationManager;
    private static final int NUM_CORES = Runtime.getRuntime()
    private static final double CONGESTION_RATIO = 1.5;
@@@ -3289,12 -3287,8 +3320,12 @@@
    public void removeSpanReceiver(long id) throws IOException {
-     spanReceiverHost.removeSpanReceiver(id);
+     tracerConfigurationManager.removeSpanReceiver(id);
 +  public ErasureCodingWorker getErasureCodingWorker(){
 +    return ecWorker;
 +  }
     * Get timeout value of each OOB type from configuration

Reply via email to