This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new e82682f  HDDS-5551. EC: Implement an Input Stream to reconstruct EC 
blocks on demand (#2797)
e82682f is described below

commit e82682fe8b4431b565201165278b96f24ec28303
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Tue Nov 9 18:15:50 2021 +0000

    HDDS-5551. EC: Implement an Input Stream to reconstruct EC blocks on demand 
(#2797)
---
 .../hadoop/ozone/client/io/ECBlockInputStream.java | 107 ++--
 .../io/ECBlockReconstructedStripeInputStream.java  | 415 ++++++++++++++
 .../client/io/InsufficientLocationsException.java  |  43 ++
 .../client/rpc/read/TestECBlockInputStream.java    |  10 -
 .../TestECBlockReconstructedStripeInputStream.java | 601 +++++++++++++++++++++
 5 files changed, 1133 insertions(+), 43 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index ae030b5..52b370b 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -55,7 +55,6 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
   private final Function<BlockID, Pipeline> refreshFunction;
   private final OmKeyLocationInfo blockInfo;
   private final DatanodeDetails[] dataLocations;
-  private final DatanodeDetails[] parityLocations;
   private final BlockExtendedInputStream[] blockStreams;
   private final int maxLocations;
 
@@ -63,6 +62,43 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
   private boolean closed = false;
   private boolean seeked = false;
 
+  protected OmKeyLocationInfo getBlockInfo() {
+    return blockInfo;
+  }
+
+  protected ECReplicationConfig getRepConfig() {
+    return repConfig;
+  }
+
+  protected DatanodeDetails[] getDataLocations() {
+    return dataLocations;
+  }
+
+  protected long getStripeSize() {
+    return stripeSize;
+  }
+
+  protected int availableDataLocations() {
+    int count = 0;
+    for (int i = 0; i < repConfig.getData(); i++) {
+      if (dataLocations[i] != null) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  protected int availableParityLocations() {
+    int count = 0;
+    for (int i = repConfig.getData();
+         i < repConfig.getData() + repConfig.getParity(); i++) {
+      if (dataLocations[i] != null) {
+        count++;
+      }
+    }
+    return count;
+  }
+
   public ECBlockInputStream(ECReplicationConfig repConfig,
       OmKeyLocationInfo blockInfo, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory, Function<BlockID,
@@ -75,33 +111,29 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
     this.xceiverClientFactory = xceiverClientFactory;
     this.refreshFunction = refreshFunction;
     this.maxLocations = repConfig.getData() + repConfig.getParity();
-    this.dataLocations = new DatanodeDetails[repConfig.getData()];
-    this.parityLocations = new DatanodeDetails[repConfig.getParity()];
+    this.dataLocations =
+        new DatanodeDetails[repConfig.getData() + repConfig.getParity()];
     this.blockStreams = new BlockExtendedInputStream[repConfig.getData()];
 
-    stripeSize = ecChunkSize * repConfig.getData();
+    this.stripeSize = (long)ecChunkSize * repConfig.getData();
     setBlockLocations(this.blockInfo.getPipeline());
   }
 
   public synchronized boolean hasSufficientLocations() {
-    // Until we implement "on the fly" recovery, all data location must be
-    // present and we have enough locations if that is the case.
-    //
     // The number of locations needed is a function of the EC Chunk size. If 
the
     // block length is <= the chunk size, we should only have location 1. If it
     // is greater than the chunk size but less than chunk_size * 2, then we 
must
     // have two locations. If it is greater than chunk_size * data_num, then we
     // must have all data_num locations.
-    int expectedDataBlocks =
-        (int)Math.min(
-            Math.ceil((double)blockInfo.getLength() / ecChunkSize),
-            repConfig.getData());
-    for (int i=0; i<expectedDataBlocks; i++) {
-      if (dataLocations[i] == null) {
-        return false;
-      }
-    }
-    return true;
+    // We only consider data locations here.
+    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
+    return expectedDataBlocks == availableDataLocations();
+  }
+
+  protected int calculateExpectedDataBlocks(ECReplicationConfig rConfig) {
+    return (int)Math.min(Math.ceil(
+        (double)getBlockInfo().getLength() / rConfig.getEcChunkSize()),
+        rConfig.getData());
   }
 
   /**
@@ -110,7 +142,7 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
    * stream reference. The block group index will be one greater than this.
    * @return
    */
-  private int currentStreamIndex() {
+  protected int currentStreamIndex() {
     return (int)((position / ecChunkSize) % repConfig.getData());
   }
 
@@ -120,9 +152,9 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
    * stream if it has not been opened already.
    * @return BlockInput stream to read from.
    */
-  private BlockExtendedInputStream getOrOpenStream() {
-    int ind = currentStreamIndex();
-    BlockExtendedInputStream stream = blockStreams[ind];
+  protected BlockExtendedInputStream getOrOpenStream(
+      int streamIndex, int locationIndex) {
+    BlockExtendedInputStream stream = blockStreams[streamIndex];
     if (stream == null) {
       // To read an EC block, we create a STANDALONE pipeline that contains the
       // single location for the block index we want to read. The EC blocks are
@@ -131,14 +163,14 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
       Pipeline pipeline = Pipeline.newBuilder()
           .setReplicationConfig(new StandaloneReplicationConfig(
               HddsProtos.ReplicationFactor.ONE))
-          .setNodes(Arrays.asList(dataLocations[ind]))
+          .setNodes(Arrays.asList(dataLocations[locationIndex]))
           .setId(PipelineID.randomId())
           .setState(Pipeline.PipelineState.CLOSED)
           .build();
 
       OmKeyLocationInfo blkInfo = new OmKeyLocationInfo.Builder()
           .setBlockID(blockInfo.getBlockID())
-          .setLength(internalBlockLength(ind+1))
+          .setLength(internalBlockLength(locationIndex+1))
           .setPipeline(blockInfo.getPipeline())
           .setToken(blockInfo.getToken())
           .setPartNumber(blockInfo.getPartNumber())
@@ -148,7 +180,7 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
           blkInfo, pipeline,
           blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
           refreshFunction);
-      blockStreams[ind] = stream;
+      blockStreams[streamIndex] = stream;
     }
     return stream;
   }
@@ -160,12 +192,19 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
    * @param index - Index number of the internal block, starting from 1
    * @return
    */
-  private long internalBlockLength(int index) {
+  protected long internalBlockLength(int index) {
     long lastStripe = blockInfo.getLength() % stripeSize;
     long blockSize = (blockInfo.getLength() - lastStripe) / 
repConfig.getData();
     long lastCell = lastStripe / ecChunkSize + 1;
     long lastCellLength = lastStripe % ecChunkSize;
 
+    if (index > repConfig.getData()) {
+      // Its a parity block and their size is driven by the size of the
+      // first block of the block group. All parity blocks have the same size
+      // as block_1.
+      index = 1;
+    }
+
     if (index < lastCell) {
       return blockSize + ecChunkSize;
     } else if (index == lastCell) {
@@ -187,18 +226,14 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
       throw new IndexOutOfBoundsException("The index " + index + " is greater "
           + "than the EC Replication Config (" + repConfig + ")");
     }
-    if (index <= repConfig.getData()) {
-      dataLocations[index - 1] = location;
-    } else {
-      parityLocations[index - repConfig.getData() - 1] = location;
-    }
+    dataLocations[index - 1] = location;
   }
 
-  private long blockLength() {
+  protected long blockLength() {
     return blockInfo.getLength();
   }
 
-  private long remaining() {
+  protected long remaining() {
     return blockLength() - position;
   }
 
@@ -222,7 +257,9 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
 
     int totalRead = 0;
     while(strategy.getTargetLength() > 0 && remaining() > 0) {
-      BlockExtendedInputStream stream = getOrOpenStream();
+      int currentIndex = currentStreamIndex();
+      BlockExtendedInputStream stream =
+          getOrOpenStream(currentIndex, currentIndex);
       int read = readFromStream(stream, strategy);
       totalRead += read;
       position += read;
@@ -349,6 +386,10 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
     return position;
   }
 
+  protected synchronized void setPos(long pos) {
+    position = pos;
+  }
+
   @Override
   public synchronized boolean seekToNewSource(long l) throws IOException {
     return false;
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
new file mode 100644
index 0000000..674c1c6
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.ozone.erasurecode.CodecRegistry;
+import org.apache.ozone.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Function;
+
+/**
+ * Class to read EC encoded data from blocks a stripe at a time, when some of
+ * the data blocks are not available. The public API for this class is:
+ *
+ *     readStripe(ByteBuffer[] bufs)
+ *
+ * The other inherited public APIs will throw a NotImplementedException. This 
is
+ * because this class is intended to only read full stripes into a reusable set
+ * of bytebuffers, and the tradition read APIs do not facilitate this.
+ *
+ * The caller should pass an array of ByteBuffers to readStripe() which:
+ *
+ * 1. Have EC DataNum buffers in the array.
+ * 2. Each buffer should have its position set to zero
+ * 3. Each buffer should have ecChunkSize remaining
+ *
+ * These buffers are either read into directly from the data blocks on the
+ * datanodes, or they will be reconstructed from parity data using the EC
+ * decoder.
+ *
+ * The EC Decoder expects to receive an array of elements matching EC Data + EC
+ * Parity elements long. Missing or not needed elements should be set to null
+ * in the array. The elements should be assigned to the array in EC index 
order.
+ *
+ * Assuming we have n missing data locations, where n <= parity locations, the
+ * ByteBuffers passed in from the client are either assigned to the decoder
+ * input array, or they are assigned to the decoder output array, where
+ * reconstructed data is written. The required number of parity buffers will be
+ * assigned and added to the decoder input so it has sufficient locations to
+ * reconstruct the data. After reconstruction the byte buffers received will
+ * have the data for a full stripe populated, either by reading directly from
+ * the block or by reconstructing the data.
+ *
+ * The buffers are returned "ready to read" with the position at zero and
+ * remaining() indicating how much data was read. If the remaining data is less
+ * than a full stripe, the client can simply read upto remaining from each
+ * buffer in turn. If there is a full stripe, each buffer should have ecChunk
+ * size remaining.
+ */
+public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ECBlockReconstructedStripeInputStream.class);
+
+  // List of buffers, data + parity long, needed by the EC decoder. Missing
+  // or not-need locations will be null.
+  private ByteBuffer[] decoderInputBuffers;
+  // Missing chunks are recovered into these buffers.
+  private ByteBuffer[] decoderOutputBuffers;
+  // Missing indexes to be recovered into the recovered buffers. Required by 
the
+  // EC decoder
+  private int[] missingIndexes;
+  // The blockLocation indexes to use to read data into the dataBuffers.
+  private List<Integer> dataIndexes = new ArrayList<>();
+
+  private final RawErasureDecoder decoder;
+
+  private boolean initialized = false;
+
+  public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+       XceiverClientFactory xceiverClientFactory, Function<BlockID,
+      Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
+    super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
+        refreshFunction, streamFactory);
+
+    decoder = CodecRegistry.getInstance()
+        .getCodecFactory(repConfig.getCodec().toString())
+        .createDecoder(repConfig);
+  }
+
+  protected void init() throws InsufficientLocationsException {
+    if (!hasSufficientLocations()) {
+      throw new InsufficientLocationsException("There are not enough " +
+          "datanodes to read the EC block");
+    }
+
+    ECReplicationConfig repConfig = getRepConfig();
+    // The EC decoder needs an array data+parity long, with missing or not
+    // needed indexes set to null.
+    decoderInputBuffers = new ByteBuffer[
+        getRepConfig().getData() + getRepConfig().getParity()];
+    DatanodeDetails[] locations = getDataLocations();
+    setMissingIndexesAndDataLocations(locations);
+    List<Integer> parityIndexes =
+        selectParityIndexes(locations, missingIndexes.length);
+    // We read from the selected parity blocks, so add them to the data 
indexes.
+    dataIndexes.addAll(parityIndexes);
+    // The decoder inputs originally start as all nulls. Then we populate the
+    // pieces we have data for. The parity buffers are reused for the block
+    // so we can allocated them now.
+    for (Integer i : parityIndexes) {
+      decoderInputBuffers[i] = allocateBuffer(repConfig);
+    }
+    decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
+    initialized = true;
+  }
+
+  /**
+   * Determine which indexes are missing, taking into account the length of the
+   * block. For a block shorter than a full EC stripe, it is expected that
+   * some of the data locations will not be present.
+   * Populates the missingIndex and dataIndexes instance variables.
+   * @param locations Available locations for the block group
+   */
+  private void setMissingIndexesAndDataLocations(DatanodeDetails[] locations) {
+    ECReplicationConfig repConfig = getRepConfig();
+    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
+    List<Integer> missingInd = new ArrayList<>();
+    for (int i = 0; i < repConfig.getData(); i++) {
+      if (locations[i] == null && i < expectedDataBlocks) {
+        missingInd.add(i);
+      } else if (locations[i] != null) {
+        dataIndexes.add(i);
+      }
+    }
+    missingIndexes = missingInd.stream().mapToInt(Integer::valueOf).toArray();
+  }
+
+  private void assignBuffers(ByteBuffer[] bufs) {
+    ECReplicationConfig repConfig = getRepConfig();
+    Preconditions.assertTrue(bufs.length == repConfig.getData());
+    int recoveryIndex = 0;
+    // Here bufs come from the caller and will be filled with data read from
+    // the blocks or recovered. Therefore, if the index is missing, we assign
+    // the buffer to the decoder outputs, where data is recovered via EC
+    // decoding. Otherwise the buffer is set to the input. Note, it may be a
+    // buffer which needs padded.
+    for (int i = 0; i < repConfig.getData(); i++) {
+      if (isMissingIndex(i)) {
+        decoderOutputBuffers[recoveryIndex++] = bufs[i];
+      } else {
+        decoderInputBuffers[i] = bufs[i];
+      }
+    }
+  }
+
+  private boolean isMissingIndex(int ind) {
+    for (int i : missingIndexes) {
+      if (i == ind) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * This method should be passed a list of byteBuffers which must contain EC
+   * Data Number entries. Each Bytebuffer should be at position 0 and have EC
+   * ChunkSize bytes remaining. After returning, the buffers will contain the
+   * data for the next stripe in the block. The buffers will be returned
+   * "ready to read" with their position set to zero and the limit set
+   * according to how much data they contain.
+   *
+   * @param bufs A list of byteBuffers which must contain EC Data Number
+   *             entries. Each Bytebuffer should be at position 0 and have
+   *             EC ChunkSize bytes remaining.
+   *
+   * @return The number of bytes read
+   * @throws IOException
+   */
+  public synchronized int readStripe(ByteBuffer[] bufs) throws IOException {
+    if (!initialized) {
+      init();
+    }
+    int toRead = (int)Math.min(getRemaining(), getStripeSize());
+    if (toRead == 0) {
+      return EOF;
+    }
+    validateBuffers(bufs);
+    assignBuffers(bufs);
+    clearParityBuffers();
+    loadDataBuffersFromStream();
+    padBuffers(toRead);
+    flipInputs();
+    decodeStripe();
+    unPadBuffers(bufs, toRead);
+    setPos(getPos() + toRead);
+    return toRead;
+  }
+
+  private void validateBuffers(ByteBuffer[] bufs) {
+    Preconditions.assertTrue(bufs.length == getRepConfig().getData());
+    int chunkSize = getRepConfig().getEcChunkSize();
+    for (ByteBuffer b : bufs) {
+      Preconditions.assertTrue(b.remaining() == chunkSize);
+    }
+  }
+
+  private void padBuffers(int toRead) {
+    int dataNum = getRepConfig().getData();
+    int parityNum = getRepConfig().getParity();
+    int chunkSize = getRepConfig().getEcChunkSize();
+    int fullChunks = toRead / chunkSize;
+    if (fullChunks == dataNum) {
+      // There is no padding to do - we are reading a full stripe.
+      return;
+    }
+    // The size of each chunk is governed by the size of the first chunk.
+    // The parity always matches the first chunk size.
+    int paritySize = Math.min(toRead, chunkSize);
+    // We never need to pad the first chunk - its length dictates the length
+    // of all others.
+    fullChunks = Math.max(1, fullChunks);
+    for (int i = fullChunks; i < dataNum; i++) {
+      ByteBuffer buf = decoderInputBuffers[i];
+      if (buf != null) {
+        buf.limit(paritySize);
+        zeroFill(buf);
+      }
+    }
+    // Ensure the available parity buffers are the expected length
+    for (int i = dataNum; i < dataNum + parityNum; i++) {
+      ByteBuffer b = decoderInputBuffers[i];
+      if (b != null) {
+        Preconditions.assertTrue(b.position() == paritySize);
+      }
+    }
+    // The output buffers need their limit set to the parity size
+    for (ByteBuffer b : decoderOutputBuffers) {
+      b.limit(paritySize);
+    }
+  }
+
+  private void unPadBuffers(ByteBuffer[] bufs, int toRead) {
+    int chunkSize = getRepConfig().getEcChunkSize();
+    int fullChunks = toRead / chunkSize;
+    int remainingLength = toRead % chunkSize;
+    if (fullChunks == getRepConfig().getData()) {
+      // We are reading a full stripe, no concerns over padding.
+      return;
+    }
+
+    if (fullChunks == 0){
+      // All buffers except the first contain no data.
+      for (int i = 1; i < bufs.length; i++) {
+        bufs[i].position(0);
+        bufs[i].limit(0);
+      }
+    } else {
+      // The first partial has the remaining length
+      bufs[fullChunks].limit(remainingLength);
+      // All others have a zero limit
+      for (int i = fullChunks + 1; i < bufs.length; i++) {
+        bufs[i].position(0);
+        bufs[i].limit(0);
+      }
+    }
+  }
+
+  private void zeroFill(ByteBuffer buf) {
+    // fill with zeros from pos to limit.
+    if (buf.hasArray()) {
+      byte[] a = buf.array();
+      Arrays.fill(a, buf.position(), buf.limit(), (byte)0);
+      buf.position(buf.limit());
+    } else {
+      while (buf.hasRemaining()) {
+        buf.put((byte)0);
+      }
+    }
+  }
+
+  /**
+   * Take the parity indexes which are available, shuffle them and truncate the
+   * list to the number of required parity chunks.
+   * @param locations The list of locations for all blocks in the block group/
+   * @param numRequired The number of parity chunks needed for reconstruction
+   * @return A list of indexes indicating which parity locations to read.
+   */
+  private List<Integer> selectParityIndexes(
+      DatanodeDetails[] locations, int numRequired) {
+    List<Integer> indexes = new ArrayList<>();
+    ECReplicationConfig repConfig = getRepConfig();
+    for (int i = repConfig.getData();
+         i < repConfig.getParity() + repConfig.getData(); i++) {
+      if (locations[i] != null) {
+        indexes.add(i);
+      }
+    }
+    Preconditions.assertTrue(indexes.size() >= numRequired);
+    Random rand = new Random();
+    while (indexes.size() > numRequired) {
+      indexes.remove(rand.nextInt(indexes.size()));
+    }
+    return indexes;
+  }
+
+  private ByteBuffer allocateBuffer(ECReplicationConfig repConfig) {
+    ByteBuffer buf = ByteBuffer.allocate(repConfig.getEcChunkSize());
+    return buf;
+  }
+
+  private void flipInputs() {
+    for (ByteBuffer b : decoderInputBuffers) {
+      if (b != null) {
+        b.flip();
+      }
+    }
+  }
+
+  private void clearParityBuffers() {
+    for (int i = getRepConfig().getData();
+         i < getRepConfig().getRequiredNodes(); i++) {
+      if (decoderInputBuffers[i] != null) {
+        decoderInputBuffers[i].clear();
+      }
+    }
+  }
+
+  protected void loadDataBuffersFromStream() throws IOException {
+    for (int i = 0; i < dataIndexes.size(); i++) {
+      BlockExtendedInputStream stream =
+          getOrOpenStream(i, dataIndexes.get(i));
+      ByteBuffer b = decoderInputBuffers[dataIndexes.get(i)];
+      while (b.hasRemaining()) {
+        int read = stream.read(b);
+        if (read == EOF) {
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Take the populated input buffers and missing indexes and create the
+   * outputs. Note that the input buffers have to be "ready for read", ie they
+   * need to have been flipped after their data was loaded. The created outputs
+   * are "ready to read" by the underlying decoder API, so there is no need to
+   * flip them after the call. The decoder reads all the inputs leaving the
+   * buffer position at the end, so the inputs are flipped after the decode so
+   * we have a complete set of "outputs" for the EC Stripe which are ready to
+   * read.
+   * @throws IOException
+   */
+  private void decodeStripe() throws IOException {
+    decoder.decode(decoderInputBuffers, missingIndexes, decoderOutputBuffers);
+    flipInputs();
+  }
+
+  @Override
+  public synchronized boolean hasSufficientLocations() {
+    // The number of locations needed is a function of the EC Chunk size. If 
the
+    // block length is <= the chunk size, we should only have one data 
location.
+    // If it is greater than the chunk size but less than chunk_size * 2, then
+    // we must have two locations. If it is greater than chunk_size * data_num,
+    // then we must have all data_num locations.
+    // The remaining data locations (for small block lengths) can be assumed to
+    // be all zeros.
+    // Then we need a total of dataNum blocks available across the available
+    // data, parity and padding blocks.
+    ECReplicationConfig repConfig = getRepConfig();
+    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
+    int availableLocations =
+        availableDataLocations() + availableParityLocations();
+    int paddedLocations = repConfig.getData() - expectedDataBlocks;
+
+    if (availableLocations + paddedLocations >= repConfig.getData()) {
+      return true;
+    } else {
+      LOG.warn("There are insufficient locations. {} available {} padded {} " +
+          "expected", availableLocations, paddedLocations, expectedDataBlocks);
+      return false;
+    }
+  }
+
+  @Override
+  protected int readWithStrategy(ByteReaderStrategy strategy) {
+    throw new NotImplementedException("readWithStrategy is not implemented. " +
+        "Use readStripe() instead");
+  }
+
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
new file mode 100644
index 0000000..956ed90
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown by EC Input Streams if there are not enough locations to
+ * read the EC data successfully.
+ */
+public class InsufficientLocationsException extends IOException {
+
+  public InsufficientLocationsException() {
+    super();
+  }
+
+  public InsufficientLocationsException(String message) {
+    super(message);
+  }
+
+  public InsufficientLocationsException(String message, Throwable ex) {
+    super(message, ex);
+  }
+
+  public InsufficientLocationsException(Throwable ex) {
+    super(ex);
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
index aa22a0b..1098eb4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -63,7 +63,6 @@ public class TestECBlockInputStream {
   }
 
   @Test
-  // TODO - this test will need changed when we can do recovery reads.
   public void testSufficientLocations() {
     // EC-3-2, 5MB block, so all 3 data locations are needed
     OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
@@ -89,15 +88,6 @@ public class TestECBlockInputStream {
       Assert.assertTrue(ecb.hasSufficientLocations());
     }
 
-    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
-    dnMap.clear();
-    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
-    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
-    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
-        keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
-      Assert.assertFalse(ecb.hasSufficientLocations());
-    }
-
     // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
     // locations.
     dnMap.clear();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
new file mode 100644
index 0000000..172aafe
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ozone.erasurecode.CodecRegistry;
+import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+
+/**
+ * Test for the ECBlockReconstructedStripeInputStream.
+ */
+public class TestECBlockReconstructedStripeInputStream {
+
+
+  private static final int ONEMB = 1024 * 1024;
+
+  private ECReplicationConfig repConfig;
+  private TestBlockInputStreamFactory streamFactory;
+
+  @Before
+  public void setup() {
+    repConfig = new ECReplicationConfig(3, 2,
+        ECReplicationConfig.EcCodec.RS, ONEMB);
+    streamFactory = new TestBlockInputStreamFactory();
+  }
+
+  @Test
+  public void testSufficientLocations() {
+    // One chunk, only 1 location.
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 1, ONEMB);
+    try (ECBlockInputStream ecb =
+        new ECBlockReconstructedStripeInputStream(repConfig,
+        keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+      Assert.assertTrue(ecb.hasSufficientLocations());
+    }
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // Two Chunks, but missing data block 2.
+    dnMap = createIndexMap(1, 4, 5);
+    keyInfo = createKeyInfo(repConfig, ONEMB * 2, dnMap);
+    try (ECBlockInputStream ecb =
+        new ECBlockReconstructedStripeInputStream(repConfig,
+        keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+      Assert.assertTrue(ecb.hasSufficientLocations());
+    }
+
+    // Three Chunks, but missing data block 2 and 3.
+    dnMap = createIndexMap(1, 4, 5);
+    keyInfo = createKeyInfo(repConfig, ONEMB * 3, dnMap);
+    try (ECBlockInputStream ecb =
+        new ECBlockReconstructedStripeInputStream(repConfig,
+        keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+      Assert.assertTrue(ecb.hasSufficientLocations());
+    }
+
+    // Three Chunks, but missing data block 2 and 3 and parity 1.
+    dnMap = createIndexMap(1, 4);
+    keyInfo = createKeyInfo(repConfig, ONEMB * 3, dnMap);
+    try (ECBlockInputStream ecb =
+        new ECBlockReconstructedStripeInputStream(repConfig,
+        keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+      Assert.assertFalse(ecb.hasSufficientLocations());
+    }
+  }
+
+  @Test
+  public void testReadFullStripesWithPartial() throws IOException {
+    // Generate the input data for 3 full stripes and generate the parity.
+    int chunkSize = repConfig.getEcChunkSize();
+    int partialStripeSize = chunkSize * 2 - 1;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 * 
chunkSize);
+    dataBufs[1].limit(4 * chunkSize - 1);
+    dataBufs[2].limit(3 * chunkSize);
+    for (ByteBuffer b : dataBufs) {
+      randomFill(b);
+    }
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+    List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
+    // Two data missing
+    locations.add(createIndexMap(1, 4, 5));
+    // One data missing
+    locations.add(createIndexMap(1, 2, 4, 5));
+    // Two data missing including first
+    locations.add(createIndexMap(2, 4, 5));
+    // One data and one parity missing
+    locations.add(createIndexMap(2, 3, 4));
+
+    for (Map<DatanodeDetails, Integer> dnMap : locations) {
+      streamFactory = new TestBlockInputStreamFactory();
+      addDataStreamsToFactory(dataBufs, parity);
+
+      OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+          stripeSize() * 3 + partialStripeSize, dnMap);
+      streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+      ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+      try (ECBlockReconstructedStripeInputStream ecb =
+          new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+              null, null, streamFactory)) {
+        // Read 3 full stripes
+        for (int i = 0; i < 3; i++) {
+          int read = ecb.readStripe(bufs);
+          for (int j = 0; j < bufs.length; j++) {
+            validateContents(dataBufs[j], bufs[j], i * chunkSize, chunkSize);
+          }
+          Assert.assertEquals(stripeSize(), read);
+
+          // Check the underlying streams have read 1 chunk per read:
+          for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+            Assert.assertEquals(chunkSize * (i + 1),
+                bis.getPos());
+          }
+          Assert.assertEquals(stripeSize() * (i + 1), ecb.getPos());
+          clearBuffers(bufs);
+        }
+        // The next read is a partial stripe
+        int read = ecb.readStripe(bufs);
+        Assert.assertEquals(partialStripeSize, read);
+        validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
+        validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
+        Assert.assertEquals(0, bufs[2].remaining());
+        Assert.assertEquals(0, bufs[2].position());
+
+        // A further read should give EOF
+        clearBuffers(bufs);
+        read = ecb.readStripe(bufs);
+        Assert.assertEquals(-1, read);
+      }
+    }
+  }
+
+  @Test
+  public void testReadPartialStripe() throws IOException {
+    int blockLength = repConfig.getEcChunkSize() - 1;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+    // First buffer has only the blockLength, the other two will have no data.
+    dataBufs[0].limit(blockLength);
+    dataBufs[1].limit(0);
+    dataBufs[2].limit(0);
+    for (ByteBuffer b : dataBufs) {
+      randomFill(b);
+    }
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+    addDataStreamsToFactory(dataBufs, parity);
+
+    ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+    // We have a length that is less than a single chunk, so blocks 2 and 3
+    // are padding and will not be present. Block 1 is lost and needs recovered
+    // from the parity and padded blocks 2 and 3.
+    Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+    OmKeyLocationInfo keyInfo =
+        createKeyInfo(repConfig, blockLength, dnMap);
+    streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+    try (ECBlockReconstructedStripeInputStream ecb =
+        new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+            null, null, streamFactory)) {
+      int read = ecb.readStripe(bufs);
+      Assert.assertEquals(blockLength, read);
+      validateContents(dataBufs[0], bufs[0], 0, blockLength);
+      Assert.assertEquals(0, bufs[1].remaining());
+      Assert.assertEquals(0, bufs[1].position());
+      Assert.assertEquals(0, bufs[2].remaining());
+      Assert.assertEquals(0, bufs[2].position());
+      // Check the underlying streams have been advanced by 1 blockLength:
+      for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+        Assert.assertEquals(blockLength, bis.getPos());
+      }
+      Assert.assertEquals(ecb.getPos(), blockLength);
+      clearBuffers(bufs);
+      // A further read should give EOF
+      read = ecb.readStripe(bufs);
+      Assert.assertEquals(-1, read);
+    }
+  }
+
+  @Test
+  public void testReadPartialStripeTwoChunks() throws IOException {
+    int chunkSize = repConfig.getEcChunkSize();
+    int blockLength = chunkSize * 2 - 1;
+
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+    // First buffer has only the blockLength, the other two will have no data.
+    dataBufs[0].limit(chunkSize);
+    dataBufs[1].limit(chunkSize - 1);
+    dataBufs[2].limit(0);
+    for (ByteBuffer b : dataBufs) {
+      randomFill(b);
+    }
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+    addDataStreamsToFactory(dataBufs, parity);
+
+    ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+    // We have a length that is less than a single chunk, so blocks 2 and 3
+    // are padding and will not be present. Block 1 is lost and needs recovered
+    // from the parity and padded blocks 2 and 3.
+    Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+    OmKeyLocationInfo keyInfo =
+        createKeyInfo(repConfig, blockLength, dnMap);
+    streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+    try (ECBlockReconstructedStripeInputStream ecb =
+        new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+            null, null, streamFactory)) {
+      int read = ecb.readStripe(bufs);
+      Assert.assertEquals(blockLength, read);
+      validateContents(dataBufs[0], bufs[0], 0, chunkSize);
+      validateContents(dataBufs[1], bufs[1], 0, chunkSize - 1);
+      Assert.assertEquals(0, bufs[2].remaining());
+      Assert.assertEquals(0, bufs[2].position());
+      // Check the underlying streams have been advanced by 1 chunk:
+      for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+        Assert.assertEquals(chunkSize, bis.getPos());
+      }
+      Assert.assertEquals(ecb.getPos(), blockLength);
+      clearBuffers(bufs);
+      // A further read should give EOF
+      read = ecb.readStripe(bufs);
+      Assert.assertEquals(-1, read);
+    }
+  }
+
+  @Test
+  public void testReadPartialStripeThreeChunks() throws IOException {
+    int chunkSize = repConfig.getEcChunkSize();
+    int blockLength = chunkSize * 3 - 1;
+
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+    // First buffer has only the blockLength, the other two will have no data.
+    dataBufs[0].limit(chunkSize);
+    dataBufs[1].limit(chunkSize);
+    dataBufs[2].limit(chunkSize - 1);
+    for (ByteBuffer b : dataBufs) {
+      randomFill(b);
+    }
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+    // We have a length that is less than a stripe, so chunks 1 and 2 are full.
+    // Block 1 is lost and needs recovered
+    // from the parity and padded blocks 2 and 3.
+
+    List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
+    // Two data missing
+    locations.add(createIndexMap(3, 4, 5));
+    // Two data missing
+    locations.add(createIndexMap(1, 4, 5));
+    // One data missing - the last one
+    locations.add(createIndexMap(1, 2, 5));
+    // One data and one parity missing
+    locations.add(createIndexMap(2, 3, 4));
+    // One data and one parity missing
+    locations.add(createIndexMap(1, 2, 4));
+
+    for (Map<DatanodeDetails, Integer> dnMap : locations) {
+      streamFactory = new TestBlockInputStreamFactory();
+      addDataStreamsToFactory(dataBufs, parity);
+      ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+
+      OmKeyLocationInfo keyInfo =
+          createKeyInfo(repConfig, blockLength, dnMap);
+      streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+      try (ECBlockReconstructedStripeInputStream ecb =
+          new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+              null, null, streamFactory)) {
+        int read = ecb.readStripe(bufs);
+        Assert.assertEquals(blockLength, read);
+        validateContents(dataBufs[0], bufs[0], 0, chunkSize);
+        validateContents(dataBufs[1], bufs[1], 0, chunkSize);
+        validateContents(dataBufs[2], bufs[2], 0, chunkSize - 1);
+        // Check the underlying streams have been advanced by 1 chunk:
+        for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+          Assert.assertEquals(0, bis.getRemaining());
+        }
+        Assert.assertEquals(ecb.getPos(), blockLength);
+        clearBuffers(bufs);
+        // A further read should give EOF
+        read = ecb.readStripe(bufs);
+        Assert.assertEquals(-1, read);
+      }
+    }
+  }
+
+  private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity) 
{
+    List<ByteBuffer> dataStreams = new ArrayList<>();
+    for (ByteBuffer b : data) {
+      dataStreams.add(b);
+    }
+    for (ByteBuffer b : parity) {
+      dataStreams.add(b);
+    }
+    streamFactory.setBlockStreamData(dataStreams);
+  }
+
+  /**
+   * Validates that the data buffer has the same contents as the source buffer,
+   * starting the checks in the src at offset and for count bytes.
+   * @param src The source of the data
+   * @param data The data which should be checked against the source
+   * @param offset The starting point in the src buffer
+   * @param count How many bytes to check.
+   */
+  private void validateContents(ByteBuffer src, ByteBuffer data, int offset,
+      int count) {
+    byte[] srcArray = src.array();
+    Assert.assertEquals(count, data.remaining());
+    for (int i = offset; i < offset + count; i++) {
+      Assert.assertEquals("Element " + i, srcArray[i], data.get());
+    }
+    data.flip();
+  }
+
+  /**
+   * Returns a new map containing a random DatanodeDetails for each index in
+   * inputs.
+   * @param idxs A list of indexes to add to the map
+   * @return A map of DatanodeDetails to index.
+   */
+  private Map<DatanodeDetails, Integer> createIndexMap(int... idxs) {
+    Map<DatanodeDetails, Integer> map = new HashMap<>();
+    for (int i : idxs) {
+      map.put(MockDatanodeDetails.randomDatanodeDetails(), i);
+    }
+    return map;
+  }
+
+  /**
+   * Given a set of data buffers, generate the parity data for the inputs.
+   * @param data A set of data buffers
+   * @param ecConfig The ECReplicationConfig representing the scheme
+   * @return
+   * @throws IOException
+   */
+  private ByteBuffer[] generateParity(ByteBuffer[] data,
+      ECReplicationConfig ecConfig) throws IOException {
+    // First data buffer dictates the size
+    int cellSize = data[0].limit();
+    // Store the positions of the remaining data buffers so we can restore them
+    int[] dataLimits = new int[data.length];
+    for (int i=1; i<data.length; i++) {
+      dataLimits[i] = data[i].limit();
+      data[i].limit(cellSize);
+      zeroFill(data[i]);
+      data[i].flip();
+    }
+    ByteBuffer[] parity = new ByteBuffer[ecConfig.getParity()];
+    for (int i = 0; i < ecConfig.getParity(); i++) {
+      parity[i] = ByteBuffer.allocate(cellSize);
+    }
+    RawErasureEncoder encoder = CodecRegistry.getInstance()
+        .getCodecFactory(repConfig.getCodec().toString())
+        .createEncoder(repConfig);
+    encoder.encode(data, parity);
+
+    data[0].flip();
+    for (int i = 1; i < data.length; i++) {
+      data[i].limit(dataLimits[i]);
+      data[i].position(0);
+    }
+    return parity;
+  }
+
+  /**
+   * Fill the remaining space in a buffer random bytes.
+   * @param buf
+   */
+  private void randomFill(ByteBuffer buf) {
+    while (buf.hasRemaining()) {
+      buf.put((byte)ThreadLocalRandom.current().nextInt(255));
+    }
+    buf.flip();
+  }
+
+  /**
+   * Fill / Pad the remaining space in a buffer with zeros.
+   * @param buf
+   */
+  private void zeroFill(ByteBuffer buf) {
+    byte[] a = buf.array();
+    Arrays.fill(a, buf.position(), buf.limit(), (byte)0);
+    buf.position(buf.limit());
+  }
+
+  /**
+   * Return a list of num ByteBuffers of the given size.
+   * @param num Number of buffers to create
+   * @param size The size of each buffer
+   * @return
+   */
+  private ByteBuffer[] allocateBuffers(int num, int size) {
+    ByteBuffer[] bufs = new ByteBuffer[num];
+    for (int i = 0; i < num; i++) {
+      bufs[i] = ByteBuffer.allocate(size);
+    }
+    return bufs;
+  }
+
+  private int stripeSize() {
+    return stripeSize(repConfig);
+  }
+
+  private int stripeSize(ECReplicationConfig rconfig) {
+    return rconfig.getEcChunkSize() * rconfig.getData();
+  }
+
+  private void clearBuffers(ByteBuffer[] bufs) {
+    for (ByteBuffer b : bufs) {
+      b.clear();
+    }
+  }
+
+  private ByteBuffer[] allocateByteBuffers(ECReplicationConfig rConfig) {
+    ByteBuffer[] bufs = new ByteBuffer[repConfig.getData()];
+    for (int i = 0; i < bufs.length; i++) {
+      bufs[i] = ByteBuffer.allocate(rConfig.getEcChunkSize());
+    }
+    return bufs;
+  }
+
+  private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+      long blockLength, Map<DatanodeDetails, Integer> dnMap) {
+
+    Pipeline pipeline = Pipeline.newBuilder()
+        .setState(Pipeline.PipelineState.CLOSED)
+        .setId(PipelineID.randomId())
+        .setNodes(new ArrayList<>(dnMap.keySet()))
+        .setReplicaIndexes(dnMap)
+        .setReplicationConfig(repConf)
+        .build();
+
+    OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
+        .setBlockID(new BlockID(1, 1))
+        .setLength(blockLength)
+        .setOffset(0)
+        .setPipeline(pipeline)
+        .setPartNumber(0)
+        .build();
+    return keyInfo;
+  }
+
+  private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+      int nodeCount, long blockLength) {
+    Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
+    for (int i = 0; i < nodeCount; i++) {
+      datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
+    }
+    return createKeyInfo(repConf, blockLength, datanodes);
+  }
+
+  private static class TestBlockInputStreamFactory implements
+      BlockInputStreamFactory {
+
+    private List<TestBlockInputStream> blockStreams = new ArrayList<>();
+    private List<ByteBuffer> blockStreamData;
+
+    private Pipeline currentPipeline;
+
+    public List<TestBlockInputStream> getBlockStreams() {
+      return blockStreams;
+    }
+
+    public void setBlockStreamData(List<ByteBuffer> bufs) {
+      this.blockStreamData = bufs;
+    }
+
+    public void setCurrentPipeline(Pipeline pipeline) {
+      this.currentPipeline = pipeline;
+    }
+
+    public BlockExtendedInputStream create(ReplicationConfig repConfig,
+        OmKeyLocationInfo blockInfo, Pipeline pipeline,
+        Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+        XceiverClientFactory xceiverFactory,
+        Function<BlockID, Pipeline> refreshFunction) {
+
+      int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
+      TestBlockInputStream stream = new TestBlockInputStream(
+          blockInfo.getBlockID(), blockInfo.getLength(),
+          blockStreamData.get(repInd -1));
+      blockStreams.add(stream);
+      return stream;
+    }
+  }
+
+  private static class TestBlockInputStream extends BlockExtendedInputStream {
+
+    private ByteBuffer data;
+    private boolean closed = false;
+    private BlockID blockID;
+    private long length;
+    private static final byte EOF = -1;
+
+    TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data) {
+      this.blockID = blockId;
+      this.length = blockLen;
+      this.data = data;
+      data.position(0);
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public BlockID getBlockID() {
+      return blockID;
+    }
+
+    @Override
+    public long getLength() {
+      return length;
+    }
+
+    @Override
+    public long getRemaining() {
+      return data.remaining();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len)
+        throws IOException {
+      return read(ByteBuffer.wrap(b, off, len));
+    }
+
+    @Override
+    public int read(ByteBuffer buf) throws IOException {
+      if (getRemaining() == 0) {
+        return EOF;
+      }
+      int toRead = Math.min(buf.remaining(), (int)getRemaining());
+      for (int i = 0; i < toRead; i++) {
+        buf.put(data.get());
+      }
+      return toRead;
+    };
+
+    @Override
+    protected int readWithStrategy(ByteReaderStrategy strategy) throws
+        IOException {
+      throw new IOException("Should not be called");
+    }
+
+    @Override
+    public void close() {
+      closed = true;
+    }
+
+    @Override
+    public void unbuffer() {
+    }
+
+    @Override
+    public long getPos() {
+      return data.position();
+    }
+
+  }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to