This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 685442c  HDDS-6001. EC: Create ECBlockReconstructedInputStream to wrap 
ECBlockReconstructedStripeInputStream (#2848)
685442c is described below

commit 685442c6b84b1372854c2a93f1779258648ed4a8
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Mon Nov 29 10:49:46 2021 +0000

    HDDS-6001. EC: Create ECBlockReconstructedInputStream to wrap 
ECBlockReconstructedStripeInputStream (#2848)
---
 .../client/io/ECBlockReconstructedInputStream.java | 205 ++++++++++
 .../ozone/client/rpc/read/ECStreamTestUtil.java    | 339 +++++++++++++++++
 .../client/rpc/read/TestECBlockInputStream.java    |  82 ++--
 .../read/TestECBlockReconstructedInputStream.java  | 310 ++++++++++++++++
 .../TestECBlockReconstructedStripeInputStream.java | 413 +++++----------------
 5 files changed, 972 insertions(+), 377 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
new file mode 100644
index 0000000..6538458
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Input stream which wraps a ECBlockReconstructedStripeInputStream to allow
+ * a EC Block to be read via the traditional InputStream read methods.
+ */
+public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
+
+  private ECReplicationConfig repConfig;
+  private ECBlockReconstructedStripeInputStream stripeReader;
+  private ByteBuffer[] bufs;
+  private boolean closed = false;
+
+  private long position = 0;
+
+  public ECBlockReconstructedInputStream(ECReplicationConfig repConfig,
+      ECBlockReconstructedStripeInputStream stripeReader) {
+    this.repConfig = repConfig;
+    this.stripeReader = stripeReader;
+
+    allocateBuffers();
+  }
+
+  @Override
+  public synchronized BlockID getBlockID() {
+    return stripeReader.getBlockID();
+  }
+
+  @Override
+  public synchronized long getRemaining() {
+    return getLength() - position;
+  }
+
+  @Override
+  public synchronized long getLength() {
+    return stripeReader.getLength();
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len)
+      throws IOException {
+    return read(ByteBuffer.wrap(b, off, len));
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    ensureNotClosed();
+    if (!hasRemaining()) {
+      return EOF;
+    }
+    int totalRead = 0;
+    while (buf.hasRemaining() && getRemaining() > 0) {
+      ByteBuffer b = selectNextBuffer();
+      if (b == null) {
+        // This should not happen, so if it does abort.
+        throw new IOException(getRemaining() + " bytes remaining but unable " +
+            "to select a buffer with data");
+      }
+      long read = readBufferToDest(b, buf);
+      totalRead += read;
+    }
+    return totalRead;
+  }
+
+  private void ensureNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException("The input stream is closed");
+    }
+  }
+
+  private ByteBuffer selectNextBuffer() throws IOException {
+    for (ByteBuffer b : bufs) {
+      if (b.hasRemaining()) {
+        return b;
+      }
+    }
+    // If we get here, then no buffer has any remaining, so we need to
+    // fill them.
+    long read = readStripe();
+    if (read == EOF) {
+      return null;
+    }
+    return selectNextBuffer();
+  }
+
+  private long readBufferToDest(ByteBuffer src, ByteBuffer dest) {
+    int initialRemaining = dest.remaining();
+    while(dest.hasRemaining() && src.hasRemaining()) {
+      dest.put(src.get());
+    }
+    int read = initialRemaining - dest.remaining();
+    position += read;
+    return read;
+  }
+
+  @Override
+  protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
+      throws IOException {
+    throw new IOException("Not Implemented");
+  }
+
+  @Override
+  public synchronized void unbuffer() {
+    stripeReader.unbuffer();
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    return position;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    stripeReader.close();
+    closed = true;
+  }
+
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    ensureNotClosed();
+    if (pos < 0 || pos >= getLength()) {
+      if (pos == 0) {
+        // It is possible for length and pos to be zero in which case
+        // seek should return instead of throwing exception
+        return;
+      }
+      throw new EOFException(
+          "EOF encountered at pos: " + pos + " for block: " + getBlockID());
+    }
+    long stripeSize = (long)repConfig.getEcChunkSize() * repConfig.getData();
+    long stripeNum = pos / stripeSize;
+    int partial = (int)(pos % stripeSize);
+    // Seek the stripe reader to the beginning of the new current stripe
+    stripeReader.seek(stripeNum * stripeSize);
+    // Now reload the data buffers and adjust their position to the partial
+    // stripe offset.
+    readAndSeekStripe(partial);
+    position = pos;
+  }
+
+  private void readAndSeekStripe(int offset) throws IOException {
+    readStripe();
+    if (offset == 0) {
+      return;
+    }
+    for (ByteBuffer b : bufs) {
+      int newPos = Math.min(b.remaining(), offset);
+      b.position(newPos);
+      offset -= newPos;
+      if (offset == 0) {
+        break;
+      }
+    }
+  }
+
+  private long readStripe() throws IOException {
+    clearBuffers();
+    return stripeReader.readStripe(bufs);
+  }
+
+  private void allocateBuffers() {
+    bufs = new ByteBuffer[repConfig.getData()];
+    for (int i = 0; i < repConfig.getData(); i++) {
+      bufs[i] = ByteBuffer.allocate(repConfig.getEcChunkSize());
+      // Initially set the limit to 0 so there is no remaining space.
+      bufs[i].limit(0);
+    }
+  }
+
+  private void clearBuffers() {
+    for (ByteBuffer b : bufs) {
+      b.clear();
+    }
+  }
+
+  private boolean hasRemaining() {
+    return getRemaining() > 0;
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
new file mode 100644
index 0000000..aabec31
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ozone.erasurecode.CodecRegistry;
+import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.ratis.util.Preconditions;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SplittableRandom;
+import java.util.function.Function;
+
+/**
+ * Utility class providing methods useful in EC tests.
+ */
+public final class ECStreamTestUtil {
+
+  private ECStreamTestUtil() {
+  }
+
+  public static OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+      long blockLength, Map<DatanodeDetails, Integer> dnMap) {
+
+    Pipeline pipeline = Pipeline.newBuilder()
+        .setState(Pipeline.PipelineState.CLOSED)
+        .setId(PipelineID.randomId())
+        .setNodes(new ArrayList<>(dnMap.keySet()))
+        .setReplicaIndexes(dnMap)
+        .setReplicationConfig(repConf)
+        .build();
+
+    OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
+        .setBlockID(new BlockID(1, 1))
+        .setLength(blockLength)
+        .setOffset(0)
+        .setPipeline(pipeline)
+        .setPartNumber(0)
+        .build();
+    return keyInfo;
+  }
+
+  public static OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+      int nodeCount, long blockLength) {
+    Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
+    for (int i = 0; i < nodeCount; i++) {
+      datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
+    }
+    return createKeyInfo(repConf, blockLength, datanodes);
+  }
+
+  /**
+   * Fill / Pad the remaining space in a buffer with zeros.
+   * @param buf
+   */
+  public static void zeroFill(ByteBuffer buf) {
+    byte[] a = buf.array();
+    Arrays.fill(a, buf.position(), buf.limit(), (byte)0);
+    buf.position(buf.limit());
+  }
+
+  /**
+   * Given a List of ByteBuffers, write length of random bytes from the given
+   * Random generator to the byte buffers. The data is striped across the
+   * buffers in stripeSize chunks.
+   * When the length of data has been written, the buffer limits are set to
+   * their final positions.
+   *
+   * @param bufs The list of buffers to fill with random data
+   * @param stripeSize The stripe size to use
+   * @param rand The random generator to use
+   * @param length The length of data to write.
+   */
+  public static void randomFill(ByteBuffer[] bufs, int stripeSize,
+      SplittableRandom rand, int length) {
+    Preconditions.assertTrue(totalSpaceAvailable(bufs) >= length);
+    int remaining = length;
+    while(remaining > 0) {
+      for (ByteBuffer b : bufs) {
+        int toWrite = Math.min(stripeSize, remaining);
+        for (int i = 0; i < toWrite; i++) {
+          b.put((byte) rand.nextInt(255));
+        }
+        remaining -= toWrite;
+      }
+    }
+    // Set the buffer limits to the final position
+    for (ByteBuffer b : bufs) {
+      b.limit(b.position());
+    }
+  }
+
+  private static int totalSpaceAvailable(ByteBuffer[] bufs) {
+    int space = 0;
+    for (ByteBuffer b : bufs) {
+      space += b.remaining();
+    }
+    return space;
+  }
+
+  /**
+   * Given a buffer which has data loaded, flip the buffer and ensure it 
matches
+   * byte for byte the next series of bytes from the Random generator.
+   * @param b Byte Buffers containing data
+   * @param rand The random generator
+   */
+  public static void assertBufferMatches(ByteBuffer b, SplittableRandom rand) {
+    b.flip();
+    int i=0;
+    while (b.hasRemaining()) {
+      i++;
+      Assert.assertEquals("Failed on iteration " + i,
+          (byte)rand.nextInt(255), b.get());
+    }
+  }
+
+  /**
+   * Given a List of ByteBuffers and the RepConfig, encode the parity buffers
+   * from the data buffers. The data buffers should be passed "as is" after
+   * reading data. That is, the position will be at the last data byte read in
+   * or the buffer limit.
+   * The data buffers and parity will be returned "ready to read" with the
+   * position reset to zero.
+   * @param data List of data buffers
+   * @param ecConfig The ECReplicationConfig.
+   * @return List of encoded parity buffers.
+   * @throws IOException
+   */
+  public static ByteBuffer[] generateParity(ByteBuffer[] data,
+      ECReplicationConfig ecConfig) throws IOException {
+    // First data buffer dictates the size
+    int cellSize = data[0].limit();
+    data[0].flip();
+    // Store the positions of the remaining data buffers so we can restore them
+    int[] dataLimits = new int[data.length];
+    for (int i = 1; i < data.length; i++) {
+      dataLimits[i] = data[i].limit();
+      data[i].limit(cellSize);
+      zeroFill(data[i]);
+      data[i].flip();
+    }
+    ByteBuffer[] parity = new ByteBuffer[ecConfig.getParity()];
+    for (int i = 0; i < ecConfig.getParity(); i++) {
+      parity[i] = ByteBuffer.allocate(cellSize);
+    }
+    RawErasureEncoder encoder = CodecRegistry.getInstance()
+        .getCodecFactory(ecConfig.getCodec().toString())
+        .createEncoder(ecConfig);
+    encoder.encode(data, parity);
+
+    data[0].flip();
+    for (int i = 1; i < data.length; i++) {
+      data[i].limit(dataLimits[i]);
+      data[i].position(0);
+    }
+    return parity;
+  }
+
+  /**
+   * Returns a new map containing a random DatanodeDetails for each index in
+   * inputs.
+   * @param idxs A list of indexes to add to the map
+   * @return A map of DatanodeDetails to index.
+   */
+  public static Map<DatanodeDetails, Integer> createIndexMap(int... idxs) {
+    Map<DatanodeDetails, Integer> map = new HashMap<>();
+    for (int i : idxs) {
+      map.put(MockDatanodeDetails.randomDatanodeDetails(), i);
+    }
+    return map;
+  }
+
+
+  /**
+   * A stream factory which can be used in tests to provide TestBlockStream
+   * instances.
+   */
+  public static class TestBlockInputStreamFactory implements
+      BlockInputStreamFactory {
+
+    private List<TestBlockInputStream> blockStreams = new ArrayList<>();
+    private List<ByteBuffer> blockStreamData;
+
+    private Pipeline currentPipeline;
+
+    public List<ECStreamTestUtil.TestBlockInputStream> getBlockStreams() {
+      return blockStreams;
+    }
+
+    public void setBlockStreamData(List<ByteBuffer> bufs) {
+      this.blockStreamData = bufs;
+    }
+
+    public void setCurrentPipeline(Pipeline pipeline) {
+      this.currentPipeline = pipeline;
+    }
+
+    public BlockExtendedInputStream create(ReplicationConfig repConfig,
+        OmKeyLocationInfo blockInfo, Pipeline pipeline,
+        Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+        XceiverClientFactory xceiverFactory,
+        Function<BlockID, Pipeline> refreshFunction) {
+
+      int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
+      TestBlockInputStream stream = new TestBlockInputStream(
+          blockInfo.getBlockID(), blockInfo.getLength(),
+          blockStreamData.get(repInd - 1));
+      blockStreams.add(stream);
+      return stream;
+    }
+  }
+
+  /**
+   * A block stream that returns data from the provided ByteBuffer. Intended to
+   * be used in tests, rather than reading from a real block stream.
+   */
+  public static class TestBlockInputStream extends BlockExtendedInputStream {
+
+    private ByteBuffer data;
+    private boolean closed = false;
+    private BlockID blockID;
+    private long length;
+    private boolean shouldError = false;
+    private static final byte EOF = -1;
+
+    TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data) {
+      this.blockID = blockId;
+      this.length = blockLen;
+      this.data = data;
+      data.position(0);
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    public void setShouldError(boolean val) {
+      shouldError = val;
+    }
+
+    @Override
+    public BlockID getBlockID() {
+      return blockID;
+    }
+
+    @Override
+    public long getLength() {
+      return length;
+    }
+
+    @Override
+    public long getRemaining() {
+      return data.remaining();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len)
+        throws IOException {
+      return read(ByteBuffer.wrap(b, off, len));
+    }
+
+    @Override
+    public int read(ByteBuffer buf) throws IOException {
+      if (shouldError) {
+        throw new IOException("Simulated error reading block");
+      }
+      if (getRemaining() == 0) {
+        return EOF;
+      }
+      int toRead = Math.min(buf.remaining(), (int)getRemaining());
+      for (int i = 0; i < toRead; i++) {
+        buf.put(data.get());
+      }
+      return toRead;
+    };
+
+    @Override
+    protected int readWithStrategy(ByteReaderStrategy strategy) throws
+        IOException {
+      throw new IOException("Should not be called");
+    }
+
+    @Override
+    public void close() {
+      closed = true;
+    }
+
+    @Override
+    public void unbuffer() {
+    }
+
+    @Override
+    public long getPos() {
+      return data.position();
+    }
+
+    @Override
+    public void seek(long pos) {
+      data.position((int)pos);
+    }
+
+  }
+
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
index 1098eb4..5a29626 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
@@ -65,14 +64,15 @@ public class TestECBlockInputStream {
   @Test
   public void testSufficientLocations() {
     // EC-3-2, 5MB block, so all 3 data locations are needed
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    OmKeyLocationInfo keyInfo = ECStreamTestUtil
+        .createKeyInfo(repConfig, 5, 5 * ONEMB);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
       Assert.assertTrue(ecb.hasSufficientLocations());
     }
 
     // EC-3-2, very large block, so all 3 data locations are needed
-    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 5000 * ONEMB);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
       Assert.assertTrue(ecb.hasSufficientLocations());
@@ -82,7 +82,7 @@ public class TestECBlockInputStream {
 
     // EC-3-2, 1 byte short of 1MB with 1 location
     dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
-    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB - 1, dnMap);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
       Assert.assertTrue(ecb.hasSufficientLocations());
@@ -92,7 +92,7 @@ public class TestECBlockInputStream {
     // locations.
     dnMap.clear();
     dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
-    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5 * ONEMB, dnMap);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
       Assert.assertFalse(ecb.hasSufficientLocations());
@@ -104,7 +104,7 @@ public class TestECBlockInputStream {
     dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
     dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
     dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
-    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5 * ONEMB, dnMap);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
       Assert.assertFalse(ecb.hasSufficientLocations());
@@ -115,7 +115,8 @@ public class TestECBlockInputStream {
   public void testCorrectBlockSizePassedToBlockStreamLessThanCell()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB - 100);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -131,7 +132,8 @@ public class TestECBlockInputStream {
   public void testCorrectBlockSizePassedToBlockStreamTwoCells()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB + 100);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -146,7 +148,8 @@ public class TestECBlockInputStream {
   public void testCorrectBlockSizePassedToBlockStreamThreeCells()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -162,7 +165,8 @@ public class TestECBlockInputStream {
   public void 
testCorrectBlockSizePassedToBlockStreamThreeFullAndPartialStripe()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -178,7 +182,8 @@ public class TestECBlockInputStream {
   public void testCorrectBlockSizePassedToBlockStreamSingleFullCell()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -192,7 +197,8 @@ public class TestECBlockInputStream {
   public void testCorrectBlockSizePassedToBlockStreamSeveralFullCells()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 9 * ONEMB);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 9 * ONEMB);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -206,7 +212,8 @@ public class TestECBlockInputStream {
 
   @Test
   public void testSimpleRead() throws IOException {
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
 
@@ -224,7 +231,8 @@ public class TestECBlockInputStream {
 
   @Test
   public void testReadPastEOF() throws IOException {
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 50);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 50);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
 
@@ -242,7 +250,8 @@ public class TestECBlockInputStream {
     // EC-3-2, 5MB block, so all 3 data locations are needed
     repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
         100);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
 
@@ -276,7 +285,8 @@ public class TestECBlockInputStream {
   public void testSeekPastBlockLength() throws IOException {
     repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
         ONEMB);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 100);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
       ecb.seek(1000);
@@ -287,7 +297,8 @@ public class TestECBlockInputStream {
   public void testSeekToLength() throws IOException {
     repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
         ONEMB);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 100);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
       ecb.seek(100);
@@ -298,7 +309,8 @@ public class TestECBlockInputStream {
   public void testSeekToLengthZeroLengthBlock() throws IOException {
     repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
         ONEMB);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 0);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 0);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
       ecb.seek(0);
@@ -311,7 +323,8 @@ public class TestECBlockInputStream {
   public void testSeekToValidPosition() throws IOException {
     repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
         ONEMB);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
       ecb.seek(ONEMB - 1);
@@ -342,37 +355,6 @@ public class TestECBlockInputStream {
     }
   }
 
-  private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
-      long blockLength, Map<DatanodeDetails, Integer> dnMap) {
-
-    Pipeline pipeline = Pipeline.newBuilder()
-        .setState(Pipeline.PipelineState.CLOSED)
-        .setId(PipelineID.randomId())
-        .setNodes(new ArrayList<>(dnMap.keySet()))
-        .setReplicaIndexes(dnMap)
-        .setReplicationConfig(repConf)
-        .build();
-
-    OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
-        .setBlockID(new BlockID(1, 1))
-        .setLength(blockLength)
-        .setOffset(0)
-        .setPipeline(pipeline)
-        .setPartNumber(0)
-        .build();
-    return keyInfo;
-  }
-
-  private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
-      int nodeCount, long blockLength) {
-    Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
-    for (int i = 0; i < nodeCount; i++) {
-      datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
-    }
-
-    return createKeyInfo(repConf, blockLength, datanodes);
-  }
-
   private static class TestBlockInputStreamFactory implements
       BlockInputStreamFactory {
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
new file mode 100644
index 0000000..c1c0574
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.client.io.ECBlockReconstructedInputStream;
+import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SplittableRandom;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static 
org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.generateParity;
+
+/**
+ * Test for the ECBlockReconstructedInputStream class.
+ */
+public class TestECBlockReconstructedInputStream {
+
+  private ECReplicationConfig repConfig;
+  private ECStreamTestUtil.TestBlockInputStreamFactory streamFactory;
+  private long randomSeed;
+  private ThreadLocalRandom random = ThreadLocalRandom.current();
+  private SplittableRandom dataGenerator;
+
+  @Before
+  public void setup() throws IOException {
+    repConfig = new ECReplicationConfig(3, 2);
+    streamFactory = new ECStreamTestUtil.TestBlockInputStreamFactory();
+
+    randomSeed = random.nextLong();
+    dataGenerator = new SplittableRandom(randomSeed);
+  }
+
+  private ECBlockReconstructedStripeInputStream createStripeInputStream(
+      Map<DatanodeDetails, Integer> dnMap, long blockLength) {
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+    streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+    return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+        null, null, streamFactory);
+  }
+
+  @Test
+  public void testBlockLengthReturned() throws IOException {
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(4, 5);
+    try(ECBlockReconstructedStripeInputStream stripeStream
+        = createStripeInputStream(dnMap, 12345L)) {
+      try (ECBlockReconstructedInputStream stream =
+          new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+        Assert.assertEquals(12345L, stream.getLength());
+      }
+    }
+  }
+
+  @Test
+  public void testBlockIDReturned() throws IOException {
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 4, 5);
+    try(ECBlockReconstructedStripeInputStream stripeStream
+            = createStripeInputStream(dnMap, 12345L)) {
+      try (ECBlockReconstructedInputStream stream =
+               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+        Assert.assertEquals(new BlockID(1, 1), stream.getBlockID());
+      }
+    }
+  }
+
+  @Test
+  public void testReadDataByteBufferMultipleStripes() throws IOException {
+    int readBufferSize = random.nextInt(4096);
+    // 3 stripes and a partial chunk
+    int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+        + repConfig.getEcChunkSize() - 1;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+        repConfig.getEcChunkSize() * 4);
+    ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+        dataGenerator, blockLength);
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+    addDataStreamsToFactory(dataBufs, parity);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+    try(ECBlockReconstructedStripeInputStream stripeStream
+            = createStripeInputStream(dnMap, blockLength)) {
+      try (ECBlockReconstructedInputStream stream =
+               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+        ByteBuffer b = ByteBuffer.allocate(readBufferSize);
+        int totalRead = 0;
+        dataGenerator = new SplittableRandom(randomSeed);
+        while (totalRead < blockLength) {
+          int expectedRead = Math.min(blockLength - totalRead, readBufferSize);
+          long read = stream.read(b);
+          totalRead += read;
+          Assert.assertEquals(expectedRead, read);
+          ECStreamTestUtil.assertBufferMatches(b, dataGenerator);
+          b.clear();
+        }
+        // Next read should be EOF
+        b.clear();
+        long read = stream.read(b);
+        Assert.assertEquals(-1, read);
+      }
+    }
+  }
+
+  @Test
+  public void testReadDataByteBufferUnderBufferSize() throws IOException {
+    int readBufferSize = 4096;
+    // Small block with less data that the read size
+    int blockLength = 1024;
+
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 1024);
+    ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+        dataGenerator, blockLength);
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+    addDataStreamsToFactory(dataBufs, parity);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+    try(ECBlockReconstructedStripeInputStream stripeStream
+            = createStripeInputStream(dnMap, blockLength)) {
+      try (ECBlockReconstructedInputStream stream =
+               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+        ByteBuffer b = ByteBuffer.allocate(readBufferSize);
+        dataGenerator = new SplittableRandom(randomSeed);
+        long read = stream.read(b);
+        Assert.assertEquals(blockLength, read);
+        ECStreamTestUtil.assertBufferMatches(b, dataGenerator);
+        b.clear();
+        // Next read should be EOF
+        read = stream.read(b);
+        Assert.assertEquals(-1, read);
+      }
+    }
+  }
+
+  @Test
+  public void testReadByteAtATime() throws IOException {
+    // 3 stripes and a partial chunk
+    int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+        + repConfig.getEcChunkSize() - 1;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+        repConfig.getEcChunkSize() * 4);
+    ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+        dataGenerator, blockLength);
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+    addDataStreamsToFactory(dataBufs, parity);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+    try(ECBlockReconstructedStripeInputStream stripeStream
+            = createStripeInputStream(dnMap, blockLength)) {
+      try (ECBlockReconstructedInputStream stream =
+               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+
+        dataGenerator = new SplittableRandom(randomSeed);
+        int totalRead = 0;
+        while (true) {
+          int val = stream.read();
+          if (val == -1) {
+            break;
+          }
+          Assert.assertEquals(dataGenerator.nextInt(255), val);
+          totalRead += 1;
+        }
+        Assert.assertEquals(blockLength, totalRead);
+      }
+    }
+  }
+
+  @Test
+  public void testReadByteBuffer() throws IOException {
+    // 3 stripes and a partial chunk
+    int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+        + repConfig.getEcChunkSize() - 1;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+        repConfig.getEcChunkSize() * 4);
+    ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+        dataGenerator, blockLength);
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+    addDataStreamsToFactory(dataBufs, parity);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+    byte[] buf = new byte[1024];
+    try(ECBlockReconstructedStripeInputStream stripeStream
+            = createStripeInputStream(dnMap, blockLength)) {
+      try (ECBlockReconstructedInputStream stream =
+               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+        int totalRead = 0;
+        dataGenerator = new SplittableRandom(randomSeed);
+        while (totalRead < blockLength) {
+          int expectedRead = Math.min(blockLength - totalRead, 1024);
+          long read = stream.read(buf, 0, buf.length);
+          totalRead += read;
+          Assert.assertEquals(expectedRead, read);
+          ECStreamTestUtil.assertBufferMatches(
+              ByteBuffer.wrap(buf, 0, (int)read), dataGenerator);
+        }
+        // Next read should be EOF
+        long read = stream.read(buf, 0, buf.length);
+        Assert.assertEquals(-1, read);
+      }
+    }
+  }
+
+  @Test
+  public void testSeek() throws IOException {
+    int readBufferSize = repConfig.getEcChunkSize() + 1024;
+    // 3 stripes and a partial chunk
+    int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+        + repConfig.getEcChunkSize() - 1;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+        repConfig.getEcChunkSize() * 4);
+    ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+        dataGenerator, blockLength);
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+    addDataStreamsToFactory(dataBufs, parity);
+
+    Map<DatanodeDetails, Integer> dnMap
+        = ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+    try(ECBlockReconstructedStripeInputStream stripeStream
+            = createStripeInputStream(dnMap, blockLength)) {
+      try (ECBlockReconstructedInputStream stream =
+               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+        ByteBuffer b = ByteBuffer.allocate(readBufferSize);
+
+        int seekPosition = 0;
+        for (int i = 0; i < 100; i++) {
+          resetAndAdvanceDataGenerator(seekPosition);
+          long expectedRead = Math.min(stream.getRemaining(), readBufferSize);
+          long read = stream.read(b);
+          Assert.assertEquals(expectedRead, read);
+          ECStreamTestUtil.assertBufferMatches(b, dataGenerator);
+          seekPosition = random.nextInt(blockLength);
+          stream.seek(seekPosition);
+          b.clear();
+        }
+        // Seeking beyond EOF should give an error
+        try {
+          stream.seek(blockLength + 1);
+          Assert.fail("Seek beyond EOF should error");
+        } catch (IOException e) {
+          // expected
+        }
+      }
+    }
+  }
+
+  private void resetAndAdvanceDataGenerator(long position) {
+    dataGenerator = new SplittableRandom(randomSeed);
+    for (long i = 0; i < position; i++) {
+      dataGenerator.nextInt(255);
+    }
+  }
+
+
+
+  /**
+   * Return a list of num ByteBuffers of the given size.
+   * @param num Number of buffers to create
+   * @param size The size of each buffer
+   * @return
+   */
+  private ByteBuffer[] allocateBuffers(int num, int size) {
+    ByteBuffer[] bufs = new ByteBuffer[num];
+    for (int i = 0; i < num; i++) {
+      bufs[i] = ByteBuffer.allocate(size);
+    }
+    return bufs;
+  }
+
+  private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity) 
{
+    List<ByteBuffer> dataStreams = new ArrayList<>();
+    for (ByteBuffer b : data) {
+      dataStreams.add(b);
+    }
+    for (ByteBuffer b : parity) {
+      dataStreams.add(b);
+    }
+    streamFactory.setBlockStreamData(dataStreams);
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 33fb33a..8b1d460 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -17,25 +17,15 @@
  */
 package org.apache.hadoop.ozone.client.rpc.read;
 
-import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
-import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
 import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
 import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
 import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
+import 
org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStreamFactory;
+import 
org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStream;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.security.token.Token;
-import org.apache.ozone.erasurecode.CodecRegistry;
-import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,12 +33,13 @@ import org.junit.Test;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.SplittableRandom;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Function;
+
+import static 
org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.generateParity;
 
 /**
  * Test for the ECBlockReconstructedStripeInputStream.
@@ -58,19 +49,26 @@ public class TestECBlockReconstructedStripeInputStream {
   private static final int ONEMB = 1024 * 1024;
 
   private ECReplicationConfig repConfig;
-  private TestBlockInputStreamFactory streamFactory;
+  private ECStreamTestUtil.TestBlockInputStreamFactory streamFactory;
+  private long randomSeed;
+  private ThreadLocalRandom random = ThreadLocalRandom.current();
+  private SplittableRandom dataGen;
 
   @Before
   public void setup() {
     repConfig = new ECReplicationConfig(3, 2,
         ECReplicationConfig.EcCodec.RS, ONEMB);
-    streamFactory = new TestBlockInputStreamFactory();
+    streamFactory = new ECStreamTestUtil.TestBlockInputStreamFactory();
+
+    randomSeed = random.nextLong();
+    dataGen = new SplittableRandom(randomSeed);
   }
 
   @Test
   public void testSufficientLocations() {
     // One chunk, only 1 location.
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 1, ONEMB);
+    OmKeyLocationInfo keyInfo = ECStreamTestUtil
+        .createKeyInfo(repConfig, 1, ONEMB);
     try (ECBlockInputStream ecb =
         new ECBlockReconstructedStripeInputStream(repConfig,
         keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
@@ -80,17 +78,18 @@ public class TestECBlockReconstructedStripeInputStream {
     Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
 
     // Two Chunks, but missing data block 2.
-    dnMap = createIndexMap(1, 4, 5);
-    keyInfo = createKeyInfo(repConfig, ONEMB * 2, dnMap);
+    dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
+    keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 2, dnMap);
     try (ECBlockInputStream ecb =
         new ECBlockReconstructedStripeInputStream(repConfig,
-        keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+        keyInfo, true, null, null,
+            new ECStreamTestUtil.TestBlockInputStreamFactory())) {
       Assert.assertTrue(ecb.hasSufficientLocations());
     }
 
     // Three Chunks, but missing data block 2 and 3.
-    dnMap = createIndexMap(1, 4, 5);
-    keyInfo = createKeyInfo(repConfig, ONEMB * 3, dnMap);
+    dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
+    keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
     try (ECBlockInputStream ecb =
         new ECBlockReconstructedStripeInputStream(repConfig,
         keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
@@ -98,8 +97,8 @@ public class TestECBlockReconstructedStripeInputStream {
     }
 
     // Three Chunks, but missing data block 2 and 3 and parity 1.
-    dnMap = createIndexMap(1, 4);
-    keyInfo = createKeyInfo(repConfig, ONEMB * 3, dnMap);
+    dnMap = ECStreamTestUtil.createIndexMap(1, 4);
+    keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
     try (ECBlockInputStream ecb =
         new ECBlockReconstructedStripeInputStream(repConfig,
         keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
@@ -112,33 +111,32 @@ public class TestECBlockReconstructedStripeInputStream {
     // Generate the input data for 3 full stripes and generate the parity.
     int chunkSize = repConfig.getEcChunkSize();
     int partialStripeSize = chunkSize * 2 - 1;
+    int blockLength = chunkSize * repConfig.getData() * 3 + partialStripeSize;
     ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 * 
chunkSize);
-    dataBufs[1].limit(4 * chunkSize - 1);
-    dataBufs[2].limit(3 * chunkSize);
-    for (ByteBuffer b : dataBufs) {
-      randomFill(b);
-    }
+    ECStreamTestUtil.randomFill(dataBufs, chunkSize, dataGen, blockLength);
+
     ByteBuffer[] parity = generateParity(dataBufs, repConfig);
 
     List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
     // Two data missing
-    locations.add(createIndexMap(1, 4, 5));
+    locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
     // One data missing
-    locations.add(createIndexMap(1, 2, 4, 5));
+    locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4, 5));
     // Two data missing including first
-    locations.add(createIndexMap(2, 4, 5));
+    locations.add(ECStreamTestUtil.createIndexMap(2, 4, 5));
     // One data and one parity missing
-    locations.add(createIndexMap(2, 3, 4));
+    locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
 
     for (Map<DatanodeDetails, Integer> dnMap : locations) {
       streamFactory = new TestBlockInputStreamFactory();
       addDataStreamsToFactory(dataBufs, parity);
 
-      OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+      OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
           stripeSize() * 3 + partialStripeSize, dnMap);
       streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
       ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+      dataGen = new SplittableRandom(randomSeed);
       try (ECBlockReconstructedStripeInputStream ecb =
           new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
               null, null, streamFactory)) {
@@ -146,7 +144,7 @@ public class TestECBlockReconstructedStripeInputStream {
         for (int i = 0; i < 3; i++) {
           int read = ecb.readStripe(bufs);
           for (int j = 0; j < bufs.length; j++) {
-            validateContents(dataBufs[j], bufs[j], i * chunkSize, chunkSize);
+            ECStreamTestUtil.assertBufferMatches(bufs[j], dataGen);
           }
           Assert.assertEquals(stripeSize(), read);
 
@@ -161,8 +159,8 @@ public class TestECBlockReconstructedStripeInputStream {
         // The next read is a partial stripe
         int read = ecb.readStripe(bufs);
         Assert.assertEquals(partialStripeSize, read);
-        validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
-        validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
+        ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
+        ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
         Assert.assertEquals(0, bufs[2].remaining());
         Assert.assertEquals(0, bufs[2].position());
 
@@ -178,13 +176,8 @@ public class TestECBlockReconstructedStripeInputStream {
   public void testReadPartialStripe() throws IOException {
     int blockLength = repConfig.getEcChunkSize() - 1;
     ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
-    // First buffer has only the blockLength, the other two will have no data.
-    dataBufs[0].limit(blockLength);
-    dataBufs[1].limit(0);
-    dataBufs[2].limit(0);
-    for (ByteBuffer b : dataBufs) {
-      randomFill(b);
-    }
+    ECStreamTestUtil
+        .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, 
blockLength);
     ByteBuffer[] parity = generateParity(dataBufs, repConfig);
     addDataStreamsToFactory(dataBufs, parity);
 
@@ -192,16 +185,18 @@ public class TestECBlockReconstructedStripeInputStream {
     // We have a length that is less than a single chunk, so blocks 2 and 3
     // are padding and will not be present. Block 1 is lost and needs recovered
     // from the parity and padded blocks 2 and 3.
-    Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(4, 5);
     OmKeyLocationInfo keyInfo =
-        createKeyInfo(repConfig, blockLength, dnMap);
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+    dataGen = new SplittableRandom(randomSeed);
     try (ECBlockReconstructedStripeInputStream ecb =
         new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
             null, null, streamFactory)) {
       int read = ecb.readStripe(bufs);
       Assert.assertEquals(blockLength, read);
-      validateContents(dataBufs[0], bufs[0], 0, blockLength);
+      ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
       Assert.assertEquals(0, bufs[1].remaining());
       Assert.assertEquals(0, bufs[1].position());
       Assert.assertEquals(0, bufs[2].remaining());
@@ -224,13 +219,8 @@ public class TestECBlockReconstructedStripeInputStream {
     int blockLength = chunkSize * 2 - 1;
 
     ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
-    // First buffer has only the blockLength, the other two will have no data.
-    dataBufs[0].limit(chunkSize);
-    dataBufs[1].limit(chunkSize - 1);
-    dataBufs[2].limit(0);
-    for (ByteBuffer b : dataBufs) {
-      randomFill(b);
-    }
+    ECStreamTestUtil
+        .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, 
blockLength);
     ByteBuffer[] parity = generateParity(dataBufs, repConfig);
     addDataStreamsToFactory(dataBufs, parity);
 
@@ -238,17 +228,19 @@ public class TestECBlockReconstructedStripeInputStream {
     // We have a length that is less than a single chunk, so blocks 2 and 3
     // are padding and will not be present. Block 1 is lost and needs recovered
     // from the parity and padded blocks 2 and 3.
-    Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(4, 5);
     OmKeyLocationInfo keyInfo =
-        createKeyInfo(repConfig, blockLength, dnMap);
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+    dataGen = new SplittableRandom(randomSeed);
     try (ECBlockReconstructedStripeInputStream ecb =
         new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
             null, null, streamFactory)) {
       int read = ecb.readStripe(bufs);
       Assert.assertEquals(blockLength, read);
-      validateContents(dataBufs[0], bufs[0], 0, chunkSize);
-      validateContents(dataBufs[1], bufs[1], 0, chunkSize - 1);
+      ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
+      ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
       Assert.assertEquals(0, bufs[2].remaining());
       Assert.assertEquals(0, bufs[2].position());
       // Check the underlying streams have been advanced by 1 chunk:
@@ -269,13 +261,8 @@ public class TestECBlockReconstructedStripeInputStream {
     int blockLength = chunkSize * 3 - 1;
 
     ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
-    // First buffer has only the blockLength, the other two will have no data.
-    dataBufs[0].limit(chunkSize);
-    dataBufs[1].limit(chunkSize);
-    dataBufs[2].limit(chunkSize - 1);
-    for (ByteBuffer b : dataBufs) {
-      randomFill(b);
-    }
+    ECStreamTestUtil
+        .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, 
blockLength);
     ByteBuffer[] parity = generateParity(dataBufs, repConfig);
 
     // We have a length that is less than a stripe, so chunks 1 and 2 are full.
@@ -284,15 +271,15 @@ public class TestECBlockReconstructedStripeInputStream {
 
     List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
     // Two data missing
-    locations.add(createIndexMap(3, 4, 5));
+    locations.add(ECStreamTestUtil.createIndexMap(3, 4, 5));
     // Two data missing
-    locations.add(createIndexMap(1, 4, 5));
+    locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
     // One data missing - the last one
-    locations.add(createIndexMap(1, 2, 5));
+    locations.add(ECStreamTestUtil.createIndexMap(1, 2, 5));
     // One data and one parity missing
-    locations.add(createIndexMap(2, 3, 4));
+    locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
     // One data and one parity missing
-    locations.add(createIndexMap(1, 2, 4));
+    locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4));
 
     for (Map<DatanodeDetails, Integer> dnMap : locations) {
       streamFactory = new TestBlockInputStreamFactory();
@@ -300,16 +287,17 @@ public class TestECBlockReconstructedStripeInputStream {
       ByteBuffer[] bufs = allocateByteBuffers(repConfig);
 
       OmKeyLocationInfo keyInfo =
-          createKeyInfo(repConfig, blockLength, dnMap);
+          ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
       streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+      dataGen = new SplittableRandom(randomSeed);
       try (ECBlockReconstructedStripeInputStream ecb =
           new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
               null, null, streamFactory)) {
         int read = ecb.readStripe(bufs);
         Assert.assertEquals(blockLength, read);
-        validateContents(dataBufs[0], bufs[0], 0, chunkSize);
-        validateContents(dataBufs[1], bufs[1], 0, chunkSize);
-        validateContents(dataBufs[2], bufs[2], 0, chunkSize - 1);
+        ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
+        ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
+        ECStreamTestUtil.assertBufferMatches(bufs[2], dataGen);
         // Check the underlying streams have been advanced by 1 chunk:
         for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
           Assert.assertEquals(0, bis.getRemaining());
@@ -327,13 +315,8 @@ public class TestECBlockReconstructedStripeInputStream {
   public void testErrorThrownIfBlockNotLongEnough() throws IOException {
     int blockLength = repConfig.getEcChunkSize() - 1;
     ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
-    // First buffer has only the blockLength, the other two will have no data.
-    dataBufs[0].limit(blockLength);
-    dataBufs[1].limit(0);
-    dataBufs[2].limit(0);
-    for (ByteBuffer b : dataBufs) {
-      randomFill(b);
-    }
+    ECStreamTestUtil
+        .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, 
blockLength);
     ByteBuffer[] parity = generateParity(dataBufs, repConfig);
     addDataStreamsToFactory(dataBufs, parity);
 
@@ -345,9 +328,10 @@ public class TestECBlockReconstructedStripeInputStream {
     // We have a length that is less than a single chunk, so blocks 2 and 3
     // are padding and will not be present. Block 1 is lost and needs recovered
     // from the parity and padded blocks 2 and 3.
-    Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(4, 5);
     OmKeyLocationInfo keyInfo =
-        createKeyInfo(repConfig, blockLength, dnMap);
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
     try (ECBlockReconstructedStripeInputStream ecb =
              new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, 
true,
@@ -369,28 +353,25 @@ public class TestECBlockReconstructedStripeInputStream {
     int partialStripeSize = chunkSize * 2 - 1;
     int dataLength = stripeSize() * 3 + partialStripeSize;
     ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 * 
chunkSize);
-    dataBufs[1].limit(4 * chunkSize - 1);
-    dataBufs[2].limit(3 * chunkSize);
-    for (ByteBuffer b : dataBufs) {
-      randomFill(b);
-    }
+    ECStreamTestUtil
+        .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, dataLength);
     ByteBuffer[] parity = generateParity(dataBufs, repConfig);
 
     List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
     // Two data missing
-    locations.add(createIndexMap(1, 4, 5));
+    locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
     // One data missing
-    locations.add(createIndexMap(1, 2, 4, 5));
+    locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4, 5));
     // Two data missing including first
-    locations.add(createIndexMap(2, 4, 5));
+    locations.add(ECStreamTestUtil.createIndexMap(2, 4, 5));
     // One data and one parity missing
-    locations.add(createIndexMap(2, 3, 4));
+    locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
 
     for (Map<DatanodeDetails, Integer> dnMap : locations) {
       streamFactory = new TestBlockInputStreamFactory();
       addDataStreamsToFactory(dataBufs, parity);
 
-      OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+      OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
           stripeSize() * 3 + partialStripeSize, dnMap);
       streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
@@ -429,7 +410,7 @@ public class TestECBlockReconstructedStripeInputStream {
 
         // seek to the start of stripe 3
         clearBuffers(bufs);
-        ecb.seek(stripeSize() * 2);
+        ecb.seek(stripeSize() * (long)2);
         read = ecb.readStripe(bufs);
         for (int j = 0; j < bufs.length; j++) {
           validateContents(dataBufs[j], bufs[j], 2 * chunkSize, chunkSize);
@@ -442,8 +423,9 @@ public class TestECBlockReconstructedStripeInputStream {
 
   @Test
   public void testSeekToPartialOffsetFails() {
-    Map<DatanodeDetails, Integer> dnMap = createIndexMap(1, 4, 5);
-    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 4, 5);
+    OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
         stripeSize() * 3, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
@@ -465,13 +447,12 @@ public class TestECBlockReconstructedStripeInputStream {
     // Generate the input data for 3 full stripes and generate the parity.
     int chunkSize = repConfig.getEcChunkSize();
     int partialStripeSize = chunkSize * 2 - 1;
+    int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+        + partialStripeSize;
     ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
         4 * chunkSize);
-    dataBufs[1].limit(4 * chunkSize - 1);
-    dataBufs[2].limit(3 * chunkSize);
-    for (ByteBuffer b : dataBufs) {
-      randomFill(b);
-    }
+    ECStreamTestUtil
+        .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, 
blockLength);
     ByteBuffer[] parity = generateParity(dataBufs, repConfig);
 
     List<List<Integer>> failLists = new ArrayList<>();
@@ -487,8 +468,9 @@ public class TestECBlockReconstructedStripeInputStream {
       addDataStreamsToFactory(dataBufs, parity);
 
       // Data block index 3 is missing and needs recovered initially.
-      Map<DatanodeDetails, Integer> dnMap = createIndexMap(1, 2, 4, 5);
-      OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+      Map<DatanodeDetails, Integer> dnMap =
+          ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+      OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
           stripeSize() * 3 + partialStripeSize, dnMap);
       streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
@@ -572,77 +554,6 @@ public class TestECBlockReconstructedStripeInputStream {
   }
 
   /**
-   * Returns a new map containing a random DatanodeDetails for each index in
-   * inputs.
-   * @param idxs A list of indexes to add to the map
-   * @return A map of DatanodeDetails to index.
-   */
-  private Map<DatanodeDetails, Integer> createIndexMap(int... idxs) {
-    Map<DatanodeDetails, Integer> map = new HashMap<>();
-    for (int i : idxs) {
-      map.put(MockDatanodeDetails.randomDatanodeDetails(), i);
-    }
-    return map;
-  }
-
-  /**
-   * Given a set of data buffers, generate the parity data for the inputs.
-   * @param data A set of data buffers
-   * @param ecConfig The ECReplicationConfig representing the scheme
-   * @return
-   * @throws IOException
-   */
-  private ByteBuffer[] generateParity(ByteBuffer[] data,
-      ECReplicationConfig ecConfig) throws IOException {
-    // First data buffer dictates the size
-    int cellSize = data[0].limit();
-    // Store the positions of the remaining data buffers so we can restore them
-    int[] dataLimits = new int[data.length];
-    for (int i=1; i<data.length; i++) {
-      dataLimits[i] = data[i].limit();
-      data[i].limit(cellSize);
-      zeroFill(data[i]);
-      data[i].flip();
-    }
-    ByteBuffer[] parity = new ByteBuffer[ecConfig.getParity()];
-    for (int i = 0; i < ecConfig.getParity(); i++) {
-      parity[i] = ByteBuffer.allocate(cellSize);
-    }
-    RawErasureEncoder encoder = CodecRegistry.getInstance()
-        .getCodecFactory(repConfig.getCodec().toString())
-        .createEncoder(repConfig);
-    encoder.encode(data, parity);
-
-    data[0].flip();
-    for (int i = 1; i < data.length; i++) {
-      data[i].limit(dataLimits[i]);
-      data[i].position(0);
-    }
-    return parity;
-  }
-
-  /**
-   * Fill the remaining space in a buffer random bytes.
-   * @param buf
-   */
-  private void randomFill(ByteBuffer buf) {
-    while (buf.hasRemaining()) {
-      buf.put((byte)ThreadLocalRandom.current().nextInt(255));
-    }
-    buf.flip();
-  }
-
-  /**
-   * Fill / Pad the remaining space in a buffer with zeros.
-   * @param buf
-   */
-  private void zeroFill(ByteBuffer buf) {
-    byte[] a = buf.array();
-    Arrays.fill(a, buf.position(), buf.limit(), (byte)0);
-    buf.position(buf.limit());
-  }
-
-  /**
    * Return a list of num ByteBuffers of the given size.
    * @param num Number of buffers to create
    * @param size The size of each buffer
@@ -678,156 +589,4 @@ public class TestECBlockReconstructedStripeInputStream {
     return bufs;
   }
 
-  private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
-      long blockLength, Map<DatanodeDetails, Integer> dnMap) {
-
-    Pipeline pipeline = Pipeline.newBuilder()
-        .setState(Pipeline.PipelineState.CLOSED)
-        .setId(PipelineID.randomId())
-        .setNodes(new ArrayList<>(dnMap.keySet()))
-        .setReplicaIndexes(dnMap)
-        .setReplicationConfig(repConf)
-        .build();
-
-    OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
-        .setBlockID(new BlockID(1, 1))
-        .setLength(blockLength)
-        .setOffset(0)
-        .setPipeline(pipeline)
-        .setPartNumber(0)
-        .build();
-    return keyInfo;
-  }
-
-  private OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
-      int nodeCount, long blockLength) {
-    Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
-    for (int i = 0; i < nodeCount; i++) {
-      datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
-    }
-    return createKeyInfo(repConf, blockLength, datanodes);
-  }
-
-  private static class TestBlockInputStreamFactory implements
-      BlockInputStreamFactory {
-
-    private List<TestBlockInputStream> blockStreams = new ArrayList<>();
-    private List<ByteBuffer> blockStreamData;
-
-    private Pipeline currentPipeline;
-
-    public List<TestBlockInputStream> getBlockStreams() {
-      return blockStreams;
-    }
-
-    public void setBlockStreamData(List<ByteBuffer> bufs) {
-      this.blockStreamData = bufs;
-    }
-
-    public void setCurrentPipeline(Pipeline pipeline) {
-      this.currentPipeline = pipeline;
-    }
-
-    public BlockExtendedInputStream create(ReplicationConfig repConfig,
-        OmKeyLocationInfo blockInfo, Pipeline pipeline,
-        Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
-        XceiverClientFactory xceiverFactory,
-        Function<BlockID, Pipeline> refreshFunction) {
-
-      int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
-      TestBlockInputStream stream = new TestBlockInputStream(
-          blockInfo.getBlockID(), blockInfo.getLength(),
-          blockStreamData.get(repInd - 1));
-      blockStreams.add(stream);
-      return stream;
-    }
-  }
-
-  private static class TestBlockInputStream extends BlockExtendedInputStream {
-
-    private ByteBuffer data;
-    private boolean closed = false;
-    private BlockID blockID;
-    private long length;
-    private boolean shouldError = false;
-    private static final byte EOF = -1;
-
-    TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data) {
-      this.blockID = blockId;
-      this.length = blockLen;
-      this.data = data;
-      data.position(0);
-    }
-
-    public boolean isClosed() {
-      return closed;
-    }
-
-    public void setShouldError(boolean val) {
-      shouldError = val;
-    }
-
-    @Override
-    public BlockID getBlockID() {
-      return blockID;
-    }
-
-    @Override
-    public long getLength() {
-      return length;
-    }
-
-    @Override
-    public long getRemaining() {
-      return data.remaining();
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len)
-        throws IOException {
-      return read(ByteBuffer.wrap(b, off, len));
-    }
-
-    @Override
-    public int read(ByteBuffer buf) throws IOException {
-      if (shouldError) {
-        throw new IOException("Simulated error reading block");
-      }
-      if (getRemaining() == 0) {
-        return EOF;
-      }
-      int toRead = Math.min(buf.remaining(), (int)getRemaining());
-      for (int i = 0; i < toRead; i++) {
-        buf.put(data.get());
-      }
-      return toRead;
-    };
-
-    @Override
-    protected int readWithStrategy(ByteReaderStrategy strategy) throws
-        IOException {
-      throw new IOException("Should not be called");
-    }
-
-    @Override
-    public void close() {
-      closed = true;
-    }
-
-    @Override
-    public void unbuffer() {
-    }
-
-    @Override
-    public long getPos() {
-      return data.position();
-    }
-
-    @Override
-    public void seek(long pos) {
-      data.position((int)pos);
-    }
-
-  }
-
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to