This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new a1d7292  HDDS-6010. EC: Create ECBlockInputStreamProxy to choose 
between reconstruction and normal reads (#2889)
a1d7292 is described below

commit a1d7292bcf7b698b82308baf1acc50f7f52fa0d4
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Dec 8 09:43:39 2021 +0000

    HDDS-6010. EC: Create ECBlockInputStreamProxy to choose between 
reconstruction and normal reads (#2889)
---
 .../ozone/client/io/BadDataLocationException.java  |  56 +++
 .../client/io/BlockInputStreamFactoryImpl.java     |  12 +-
 .../hadoop/ozone/client/io/ECBlockInputStream.java |  19 +-
 ...oryImpl.java => ECBlockInputStreamFactory.java} |  46 +--
 ...mpl.java => ECBlockInputStreamFactoryImpl.java} |  58 +--
 .../ozone/client/io/ECBlockInputStreamProxy.java   | 220 ++++++++++++
 .../io/ECBlockReconstructedStripeInputStream.java  |  27 ++
 .../ozone/client/rpc/read/ECStreamTestUtil.java    |  54 ++-
 .../rpc/read/TestBlockInputStreamFactoryImpl.java  |   4 +-
 .../client/rpc/read/TestECBlockInputStream.java    |  40 ++-
 .../rpc/read/TestECBlockInputStreamProxy.java      | 395 +++++++++++++++++++++
 .../TestECBlockReconstructedStripeInputStream.java |  75 +++-
 12 files changed, 939 insertions(+), 67 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
new file mode 100644
index 0000000..dc8f7b0
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.io.IOException;
+
+/**
+ * Exception used to indicate a problem with a specific block location, 
allowing
+ * the failed location to be communicated back to the caller.
+ */
+public class BadDataLocationException extends IOException {
+
+  private DatanodeDetails failedLocation;
+
+  public BadDataLocationException(DatanodeDetails dn) {
+    super();
+    failedLocation = dn;
+  }
+
+  public BadDataLocationException(DatanodeDetails dn, String message) {
+    super(message);
+    failedLocation = dn;
+  }
+
+  public BadDataLocationException(DatanodeDetails dn, String message,
+      Throwable ex) {
+    super(message, ex);
+    failedLocation = dn;
+  }
+
+  public BadDataLocationException(DatanodeDetails dn, Throwable ex) {
+    super(ex);
+    failedLocation = dn;
+  }
+
+  public DatanodeDetails getFailedLocation() {
+    return failedLocation;
+  }
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 1d372a7..6cdb991 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -36,10 +36,17 @@ import java.util.function.Function;
  */
 public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
 
+  private ECBlockInputStreamFactory ecBlockStreamFactory;
+
   public static BlockInputStreamFactory getInstance() {
     return new BlockInputStreamFactoryImpl();
   }
 
+  public BlockInputStreamFactoryImpl() {
+    this.ecBlockStreamFactory =
+        ECBlockInputStreamFactoryImpl.getInstance(this);
+  }
+
   /**
    * Create a new BlockInputStream based on the replication Config. If the
    * replication Config indicates the block is EC, then it will create an
@@ -59,8 +66,9 @@ public class BlockInputStreamFactoryImpl implements 
BlockInputStreamFactory {
       XceiverClientFactory xceiverFactory,
       Function<BlockID, Pipeline> refreshFunction) {
     if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
-      return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
-          verifyChecksum, xceiverFactory, refreshFunction, this);
+      return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
+          blockInfo, verifyChecksum, xceiverFactory, refreshFunction,
+          ecBlockStreamFactory);
     } else {
       return new BlockInputStream(blockInfo.getBlockID(), 
blockInfo.getLength(),
           pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 07333d2..5212eea 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -131,9 +131,7 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
   }
 
   protected int calculateExpectedDataBlocks(ECReplicationConfig rConfig) {
-    return (int)Math.min(Math.ceil(
-        (double)getBlockInfo().getLength() / rConfig.getEcChunkSize()),
-        rConfig.getData());
+    return ECBlockInputStreamProxy.expectedDataLocations(rConfig, getLength());
   }
 
   /**
@@ -256,11 +254,16 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
 
     int totalRead = 0;
     while(strategy.getTargetLength() > 0 && remaining() > 0) {
-      int currentIndex = currentStreamIndex();
-      BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
-      int read = readFromStream(stream, strategy);
-      totalRead += read;
-      position += read;
+      try {
+        int currentIndex = currentStreamIndex();
+        BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
+        int read = readFromStream(stream, strategy);
+        totalRead += read;
+        position += read;
+      } catch (IOException ioe) {
+        throw new BadDataLocationException(
+            dataLocations[currentStreamIndex()], ioe);
+      }
     }
     return totalRead;
   }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
similarity index 52%
copy from 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
copy to 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
index 1d372a7..6c39e93 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
@@ -18,53 +18,41 @@
 package org.apache.hadoop.ozone.client.io;
 
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
-import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.security.token.Token;
 
+import java.util.List;
 import java.util.function.Function;
 
 /**
- * Factory class to create various BlockStream instances.
+ * Interface used by factories which create ECBlockInput streams for
+ * reconstruction or non-reconstruction reads.
  */
-public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
-
-  public static BlockInputStreamFactory getInstance() {
-    return new BlockInputStreamFactoryImpl();
-  }
+public interface ECBlockInputStreamFactory {
 
   /**
-   * Create a new BlockInputStream based on the replication Config. If the
-   * replication Config indicates the block is EC, then it will create an
-   * ECBlockInputStream, otherwise a BlockInputStream will be returned.
+   * Create a new EC InputStream based on the missingLocations boolean. If it 
is
+   * set to false, it indicates all locations are available and an
+   * ECBlockInputStream will be created. Otherwise an
+   * ECBlockReconstructedInputStream will be created.
+   * @param missingLocations Indicates if all the data locations are available
+   *                         or not, controlling the type of stream created
+   * @param failedLocations List of DatanodeDetails indicating locations we
+   *                        know are bad and should not be used.
    * @param repConfig The replication Config
    * @param blockInfo The blockInfo representing the block.
-   * @param pipeline The pipeline to be used for reading the block
-   * @param token The block Access Token
    * @param verifyChecksum Whether to verify checksums or not.
    * @param xceiverFactory Factory to create the xceiver in the client
    * @param refreshFunction Function to refresh the pipeline if needed
    * @return BlockExtendedInputStream of the correct type.
    */
-  public BlockExtendedInputStream create(ReplicationConfig repConfig,
-      OmKeyLocationInfo blockInfo, Pipeline pipeline,
-      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+  BlockExtendedInputStream create(boolean missingLocations,
+      List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
       XceiverClientFactory xceiverFactory,
-      Function<BlockID, Pipeline> refreshFunction) {
-    if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
-      return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
-          verifyChecksum, xceiverFactory, refreshFunction, this);
-    } else {
-      return new BlockInputStream(blockInfo.getBlockID(), 
blockInfo.getLength(),
-          pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
-    }
-  }
-
+      Function<BlockID, Pipeline> refreshFunction);
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
similarity index 50%
copy from 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
copy to 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
index 1d372a7..75956c4 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
@@ -20,50 +20,68 @@ package org.apache.hadoop.ozone.client.io;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
-import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.security.token.Token;
 
+import java.util.List;
 import java.util.function.Function;
 
 /**
  * Factory class to create various BlockStream instances.
  */
-public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
+public final class ECBlockInputStreamFactoryImpl implements
+    ECBlockInputStreamFactory {
 
-  public static BlockInputStreamFactory getInstance() {
-    return new BlockInputStreamFactoryImpl();
+  private final BlockInputStreamFactory inputStreamFactory;
+
+  public static ECBlockInputStreamFactory getInstance(
+      BlockInputStreamFactory streamFactory) {
+    return new ECBlockInputStreamFactoryImpl(streamFactory);
+  }
+
+  private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory) 
{
+    this.inputStreamFactory = streamFactory;
   }
 
   /**
-   * Create a new BlockInputStream based on the replication Config. If the
-   * replication Config indicates the block is EC, then it will create an
-   * ECBlockInputStream, otherwise a BlockInputStream will be returned.
+   * Create a new EC InputStream based on the missingLocations boolean. If it 
is
+   * set to false, it indicates all locations are available and an
+   * ECBlockInputStream will be created. Otherwise an
+   * ECBlockReconstructedInputStream will be created.
+   * @param missingLocations Indicates if all the data locations are available
+   *                         or not, controlling the type of stream created
+   * @param failedLocations List of DatanodeDetails indicating locations we
+   *                        know are bad and should not be used.
    * @param repConfig The replication Config
    * @param blockInfo The blockInfo representing the block.
-   * @param pipeline The pipeline to be used for reading the block
-   * @param token The block Access Token
    * @param verifyChecksum Whether to verify checksums or not.
    * @param xceiverFactory Factory to create the xceiver in the client
    * @param refreshFunction Function to refresh the pipeline if needed
    * @return BlockExtendedInputStream of the correct type.
    */
-  public BlockExtendedInputStream create(ReplicationConfig repConfig,
-      OmKeyLocationInfo blockInfo, Pipeline pipeline,
-      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+  public BlockExtendedInputStream create(boolean missingLocations,
+      List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
       XceiverClientFactory xceiverFactory,
       Function<BlockID, Pipeline> refreshFunction) {
-    if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
-      return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
-          verifyChecksum, xceiverFactory, refreshFunction, this);
+    if (missingLocations) {
+      // We create the reconstruction reader
+      ECBlockReconstructedStripeInputStream sis =
+          new ECBlockReconstructedStripeInputStream(
+              (ECReplicationConfig)repConfig, blockInfo, verifyChecksum,
+              xceiverFactory, refreshFunction, inputStreamFactory);
+      if (failedLocations != null) {
+        sis.addFailedDatanodes(failedLocations);
+      }
+      return new ECBlockReconstructedInputStream(
+          (ECReplicationConfig) repConfig, sis);
     } else {
-      return new BlockInputStream(blockInfo.getBlockID(), 
blockInfo.getLength(),
-          pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
+      // Otherwise create the more efficient non-reconstruction reader
+      return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
+          verifyChecksum, xceiverFactory, refreshFunction, inputStreamFactory);
     }
   }
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
new file mode 100644
index 0000000..2ed173c
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Top level class used to read data from EC Encoded blocks. This class 
decides,
+ * based on the block availability, whether to use a reconstruction or non
+ * reconstruction read and also handles errors from the non-reconstruction 
reads
+ * failing over to a reconstruction read when they happen.
+ */
+public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ECBlockInputStreamProxy.class);
+
+  private final ECReplicationConfig repConfig;
+  private final boolean verifyChecksum;
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+  private final OmKeyLocationInfo blockInfo;
+  private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
+
+  private BlockExtendedInputStream blockReader;
+  private boolean reconstructionReader = false;
+  private List<DatanodeDetails> failedLocations = new ArrayList<>();
+
+  /**
+   * Given the ECReplicationConfig and the block length, calculate how many
+   * data locations the block should have.
+   * @param repConfig The EC Replication Config
+   * @param blockLength The length of the data block in bytes
+   * @return The number of expected data locations
+   */
+  public static int expectedDataLocations(ECReplicationConfig repConfig,
+      long blockLength) {
+    return (int)Math.min(
+        Math.ceil((double)blockLength / repConfig.getEcChunkSize()),
+        repConfig.getData());
+  }
+
+  /**
+   * From ECReplicationConfig and Pipeline with the block locations and 
location
+   * indexes, determine the number of data locations available.
+   * @param repConfig The EC Replication Config
+   * @param pipeline The pipeline for the data block, givings its locations and
+   *                 the index of each location.
+   * @return The number of locations available
+   */
+  public static int availableDataLocations(ECReplicationConfig repConfig,
+      Pipeline pipeline) {
+    Set<Integer> locations = new HashSet<>();
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      int index = pipeline.getReplicaIndex(dn);
+      if (index > 0 && index <= repConfig.getData()) {
+        locations.add(index);
+      }
+    }
+    return locations.size();
+  }
+
+  public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory, Function<BlockID,
+      Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
+    this.repConfig = repConfig;
+    this.verifyChecksum = verifyChecksum;
+    this.blockInfo = blockInfo;
+    this.ecBlockInputStreamFactory = streamFactory;
+    this.xceiverClientFactory = xceiverClientFactory;
+    this.refreshFunction = refreshFunction;
+
+    setReaderType();
+    createBlockReader();
+  }
+
+  private synchronized void setReaderType() {
+    int expected = expectedDataLocations(repConfig, getLength());
+    int available = availableDataLocations(repConfig, blockInfo.getPipeline());
+    reconstructionReader = available < expected;
+  }
+
+  private void createBlockReader() {
+    blockReader = ecBlockInputStreamFactory.create(reconstructionReader,
+        failedLocations, repConfig, blockInfo, verifyChecksum,
+        xceiverClientFactory, refreshFunction);
+  }
+
+  @Override
+  public synchronized BlockID getBlockID() {
+    return blockInfo.getBlockID();
+  }
+
+  @Override
+  public synchronized long getRemaining() {
+    return blockReader.getRemaining();
+  }
+
+  @Override
+  public synchronized long getLength() {
+    return blockInfo.getLength();
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len)
+      throws IOException {
+    return read(ByteBuffer.wrap(b, off, len));
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    if (blockReader.getRemaining() == 0) {
+      return EOF;
+    }
+    int totalRead = 0;
+    long lastPosition = 0;
+    try {
+      while (buf.hasRemaining() && getRemaining() > 0) {
+        buf.mark();
+        lastPosition = blockReader.getPos();
+        totalRead += blockReader.read(buf);
+      }
+    } catch (IOException e) {
+      if (reconstructionReader) {
+        // If we get an error from the reconstruction reader, there
+        // is nothing left to try. It will re-try until it has insufficient
+        // locations internally, so if an error comes here, just re-throw it.
+        throw e;
+      }
+      if (e instanceof BadDataLocationException) {
+        LOG.warn("Failing over to reconstruction read due to an error in " +
+            "ECBlockReader", e);
+        failoverToReconstructionRead(
+            ((BadDataLocationException) e).getFailedLocation(), lastPosition);
+        buf.reset();
+        totalRead += read(buf);
+      } else {
+        throw e;
+      }
+    }
+    return totalRead;
+  }
+
+  private synchronized void failoverToReconstructionRead(
+      DatanodeDetails badLocation, long lastPosition) throws IOException {
+    if (badLocation != null) {
+      failedLocations.add(badLocation);
+    }
+    blockReader.close();
+    reconstructionReader = true;
+    createBlockReader();
+    if (lastPosition != 0) {
+      blockReader.seek(lastPosition);
+    }
+  }
+
+  /**
+   * Should never be called in this class.
+   */
+  @Override
+  protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
+      throws IOException {
+    throw new IOException("Not Implemented");
+  }
+
+  @Override
+  public synchronized void unbuffer() {
+    blockReader.unbuffer();
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    return blockReader != null ? blockReader.getPos() : 0;
+  }
+
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    try {
+      blockReader.seek(pos);
+    } catch (IOException e) {
+      if (reconstructionReader) {
+        throw e;
+      }
+      failoverToReconstructionRead(null, pos);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index b6478c5..a918eb4 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -120,6 +120,33 @@ public class ECBlockReconstructedStripeInputStream extends 
ECBlockInputStream {
     decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
   }
 
+  /**
+   * Provide a list of datanodes that are known to be bad, and no attempt will
+   * be made to read from them. If too many failed nodes are passed, then the
+   * reader may not have sufficient locations available to reconstruct the 
data.
+   *
+   * Note this call must be made before any attempt it made to read data,
+   * as that is when the reader is initialized. Attempting to call this method
+   * after a read will result in a runtime exception.
+   *
+   * @param dns A list of DatanodeDetails that are known to be bad.
+   */
+  public void addFailedDatanodes(List<DatanodeDetails> dns) {
+    if (initialized) {
+      throw new RuntimeException("Cannot add failed datanodes after the " +
+          "reader has been initialized");
+    }
+    DatanodeDetails[] locations = getDataLocations();
+    for (DatanodeDetails dn : dns) {
+      for (int i = 0; i < locations.length; i++) {
+        if (locations[i] != null && locations[i].equals(dn)) {
+          failedDataIndexes.add(i);
+          break;
+        }
+      }
+    }
+  }
+
   protected void init() throws InsufficientLocationsException {
     if (!hasSufficientLocations()) {
       throw new InsufficientLocationsException("There are insufficient " +
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
index aabec31..c2f51c2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
@@ -125,6 +125,12 @@ public final class ECStreamTestUtil {
     }
   }
 
+  public static void randomFill(ByteBuffer buf, SplittableRandom rand) {
+    while (buf.remaining() > 0) {
+      buf.put((byte) rand.nextInt(255));
+    }
+  }
+
   private static int totalSpaceAvailable(ByteBuffer[] bufs) {
     int space = 0;
     for (ByteBuffer b : bufs) {
@@ -239,7 +245,7 @@ public final class ECStreamTestUtil {
       int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
       TestBlockInputStream stream = new TestBlockInputStream(
           blockInfo.getBlockID(), blockInfo.getLength(),
-          blockStreamData.get(repInd - 1));
+          blockStreamData.get(repInd - 1), repInd);
       blockStreams.add(stream);
       return stream;
     }
@@ -256,12 +262,22 @@ public final class ECStreamTestUtil {
     private BlockID blockID;
     private long length;
     private boolean shouldError = false;
+    private int shouldErrorPosition = 0;
+    private boolean shouldErrorOnSeek = false;
+    private IOException errorToThrow = null;
+    private int ecReplicaIndex = 0;
     private static final byte EOF = -1;
 
     TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data) {
+      this(blockId, blockLen, data, 0);
+    }
+
+    TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data,
+        int replicaIndex) {
       this.blockID = blockId;
       this.length = blockLen;
       this.data = data;
+      this.ecReplicaIndex = replicaIndex;
       data.position(0);
     }
 
@@ -269,8 +285,24 @@ public final class ECStreamTestUtil {
       return closed;
     }
 
+    public void setShouldErrorOnSeek(boolean val) {
+      this.shouldErrorOnSeek = val;
+    }
+
     public void setShouldError(boolean val) {
       shouldError = val;
+      shouldErrorPosition = 0;
+    }
+
+    public void setShouldError(boolean val, int position,
+        IOException errorThrowable) {
+      this.shouldError = val;
+      this.shouldErrorPosition = position;
+      this.errorToThrow = errorThrowable;
+    }
+
+    public int getEcReplicaIndex() {
+      return ecReplicaIndex;
     }
 
     @Override
@@ -296,19 +328,30 @@ public final class ECStreamTestUtil {
 
     @Override
     public int read(ByteBuffer buf) throws IOException {
-      if (shouldError) {
-        throw new IOException("Simulated error reading block");
+      if (shouldError && data.position() >= shouldErrorPosition) {
+        throwError();
       }
       if (getRemaining() == 0) {
         return EOF;
       }
       int toRead = Math.min(buf.remaining(), (int)getRemaining());
       for (int i = 0; i < toRead; i++) {
+        if (shouldError && data.position() >= shouldErrorPosition) {
+          throwError();
+        }
         buf.put(data.get());
       }
       return toRead;
     };
 
+    private void throwError() throws IOException {
+      if (errorToThrow != null) {
+        throw errorToThrow;
+      } else {
+        throw new IOException("Simulated error reading block");
+      }
+    }
+
     @Override
     protected int readWithStrategy(ByteReaderStrategy strategy) throws
         IOException {
@@ -330,7 +373,10 @@ public final class ECStreamTestUtil {
     }
 
     @Override
-    public void seek(long pos) {
+    public void seek(long pos) throws IOException {
+      if (shouldErrorOnSeek) {
+        throw new IOException("Simulated exception");
+      }
       data.position((int)pos);
     }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
index 84e1817..badb33b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
@@ -30,7 +30,7 @@ import 
org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
 import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
 import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
-import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.junit.Test;
 import org.junit.Assert;
@@ -73,7 +73,7 @@ public class TestBlockInputStreamFactoryImpl {
     BlockExtendedInputStream stream =
         factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
             blockInfo.getToken(), true, null, null);
-    Assert.assertTrue(stream instanceof ECBlockInputStream);
+    Assert.assertTrue(stream instanceof ECBlockInputStreamProxy);
     Assert.assertEquals(stream.getBlockID(), blockInfo.getBlockID());
     Assert.assertEquals(stream.getLength(), blockInfo.getLength());
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
index 5a29626..d681c61 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BadDataLocationException;
 import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
 import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -348,6 +349,34 @@ public class TestECBlockInputStream {
     }
   }
 
+  @Test
+  public void testErrorReadingBlockReportsBadLocation() throws IOException {
+    repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+        ONEMB);
+    OmKeyLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+        keyInfo, true, null, null, streamFactory)) {
+      // Read a full stripe to ensure all streams are created in the stream
+      // factory
+      ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+      int read = ecb.read(buf);
+      Assert.assertEquals(3 * ONEMB, read);
+      // Now make replication index 2 error on the next read
+      streamFactory.getBlockStreams().get(1).setThrowException(true);
+      buf.clear();
+      try {
+        ecb.read(buf);
+        Assert.fail("Exception should be thrown");
+      } catch (IOException e) {
+        Assert.assertTrue(e instanceof BadDataLocationException);
+        Assert.assertEquals(2,
+            keyInfo.getPipeline().getReplicaIndex(
+                ((BadDataLocationException) e).getFailedLocation()));
+      }
+    }
+  }
+
   private void validateBufferContents(ByteBuffer buf, int from, int to,
       byte val) {
     for (int i=from; i<to; i++){
@@ -384,6 +413,7 @@ public class TestECBlockInputStream {
     private byte dataVal = 1;
     private BlockID blockID;
     private long length;
+    private boolean throwException = false;
     private static final byte EOF = -1;
 
     @SuppressWarnings("checkstyle:parameternumber")
@@ -397,6 +427,10 @@ public class TestECBlockInputStream {
       return closed;
     }
 
+    public void setThrowException(boolean shouldThrow) {
+      this.throwException = shouldThrow;
+    }
+
     @Override
     public BlockID getBlockID() {
       return blockID;
@@ -419,11 +453,15 @@ public class TestECBlockInputStream {
     }
 
     @Override
-    public int read(ByteBuffer buf) {
+    public int read(ByteBuffer buf) throws IOException {
       if (getRemaining() == 0) {
         return EOF;
       }
 
+      if (throwException) {
+        throw new IOException("Simulated exception");
+      }
+
       int toRead = Math.min(buf.remaining(), (int)getRemaining());
       for (int i=0; i<toRead; i++) {
         buf.put(dataVal);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
new file mode 100644
index 0000000..f9b5f38
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.ozone.client.io.BadDataLocationException;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SplittableRandom;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+
+/**
+ * Unit tests for the  ECBlockInputStreamProxy class.
+ */
+public class TestECBlockInputStreamProxy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestECBlockInputStreamProxy.class);
+
+  private static final int ONEMB = 1024 * 1024;
+  private ECReplicationConfig repConfig;
+  private TestECBlockInputStreamFactory streamFactory;
+
+  private long randomSeed;
+  private ThreadLocalRandom random = ThreadLocalRandom.current();
+  private SplittableRandom dataGenerator;
+
+  @Before
+  public void setup() {
+    repConfig = new ECReplicationConfig(3, 2);
+    streamFactory = new TestECBlockInputStreamFactory();
+    randomSeed = random.nextLong();
+    dataGenerator = new SplittableRandom(randomSeed);
+  }
+
+  @Test
+  public void testExpectedDataLocations() {
+    Assert.assertEquals(1,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 1));
+    Assert.assertEquals(2,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, ONEMB + 1));
+    Assert.assertEquals(3,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 3 * ONEMB));
+    Assert.assertEquals(3,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 10 * ONEMB));
+
+    repConfig = new ECReplicationConfig(6, 3);
+    Assert.assertEquals(1,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 1));
+    Assert.assertEquals(2,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, ONEMB + 1));
+    Assert.assertEquals(3,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 3 * ONEMB));
+    Assert.assertEquals(6,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 10 * ONEMB));
+  }
+
+  @Test
+  public void testAvailableDataLocations() {
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
+    Assert.assertEquals(2, ECBlockInputStreamProxy.availableDataLocations(
+        repConfig, blockInfo.getPipeline()));
+
+    dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
+    blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
+    Assert.assertEquals(1, ECBlockInputStreamProxy.availableDataLocations(
+        repConfig, blockInfo.getPipeline()));
+
+    dnMap = ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
+    Assert.assertEquals(3, ECBlockInputStreamProxy.availableDataLocations(
+        repConfig, blockInfo.getPipeline()));
+  }
+
+  @Test
+  public void testBlockIDCanBeRetrieved() throws IOException {
+    int blockLength = 1234;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      Assert.assertEquals(blockInfo.getBlockID(), bis.getBlockID());
+    }
+  }
+
+  @Test
+  public void testBlockLengthCanBeRetrieved() throws IOException {
+    int blockLength = 1234;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      Assert.assertEquals(1234, bis.getLength());
+    }
+  }
+
+  @Test
+  public void testBlockRemainingCanBeRetrieved() throws IOException {
+    int blockLength = 12345;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    dataGenerator = new SplittableRandom(randomSeed);
+    ByteBuffer readBuffer = ByteBuffer.allocate(100);
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      Assert.assertEquals(12345, bis.getRemaining());
+      Assert.assertEquals(0, bis.getPos());
+      bis.read(readBuffer);
+      Assert.assertEquals(12345 - 100, bis.getRemaining());
+      Assert.assertEquals(100, bis.getPos());
+    }
+  }
+
+  @Test
+  public void testCorrectStreamCreatedDependingOnDataLocations()
+      throws IOException {
+    int blockLength = 5 * ONEMB;
+    ByteBuffer data = generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      // Not all locations present, so we expect on;y the "missing=true" stream
+      // to be present.
+      Assert.assertTrue(streamFactory.getStreams().containsKey(false));
+      Assert.assertFalse(streamFactory.getStreams().containsKey(true));
+    }
+
+    streamFactory = new TestECBlockInputStreamFactory();
+    streamFactory.setData(data);
+    dnMap = ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
+    blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      // Not all locations present, so we expect on;y the "missing=true" stream
+      // to be present.
+      Assert.assertFalse(streamFactory.getStreams().containsKey(false));
+      Assert.assertTrue(streamFactory.getStreams().containsKey(true));
+    }
+  }
+
+  @Test
+  public void testCanReadNonReconstructionToEOF()
+      throws IOException {
+    int blockLength = 5 * ONEMB;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    ByteBuffer readBuffer = ByteBuffer.allocate(100);
+    dataGenerator = new SplittableRandom(randomSeed);
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      while(true) {
+        int read = bis.read(readBuffer);
+        ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+        readBuffer.clear();
+        if (read < 100) {
+          break;
+        }
+      }
+      readBuffer.clear();
+      int read = bis.read(readBuffer);
+      Assert.assertEquals(-1, read);
+    }
+  }
+
+  @Test
+  public void testCanReadReconstructionToEOF()
+      throws IOException {
+    int blockLength = 5 * ONEMB;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    ByteBuffer readBuffer = ByteBuffer.allocate(100);
+    dataGenerator = new SplittableRandom(randomSeed);
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      while(true) {
+        int read = bis.read(readBuffer);
+        ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+        readBuffer.clear();
+        if (read < 100) {
+          break;
+        }
+      }
+      readBuffer.clear();
+      int read = bis.read(readBuffer);
+      Assert.assertEquals(-1, read);
+    }
+  }
+
+  @Test
+  public void testCanHandleErrorAndFailOverToReconstruction()
+      throws IOException {
+    int blockLength = 5 * ONEMB;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    ByteBuffer readBuffer = ByteBuffer.allocate(100);
+    DatanodeDetails badDN = blockInfo.getPipeline().getFirstNode();
+
+    dataGenerator = new SplittableRandom(randomSeed);
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      // Perform one read to get the stream created
+      int read = bis.read(readBuffer);
+      Assert.assertEquals(100, read);
+      ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+      // Setup an error to be thrown part through a read, so the dataBuffer
+      // will have been advanced by 50 bytes before the error. This tests it
+      // correctly rewinds and the same data is loaded again from the other
+      // stream.
+      streamFactory.getStreams().get(false).setShouldError(true, 151,
+          new BadDataLocationException(badDN, "Simulated Error"));
+      while(true) {
+        readBuffer.clear();
+        read = bis.read(readBuffer);
+        ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+        if (read < 100) {
+          break;
+        }
+      }
+      readBuffer.clear();
+      read = bis.read(readBuffer);
+      Assert.assertEquals(-1, read);
+      // Ensure the bad location was passed into the factory to create the
+      // reconstruction reader
+      Assert.assertEquals(badDN, streamFactory.getFailedLocations().get(0));
+    }
+  }
+
+  @Test
+  public void testCanSeekToNewPosition() throws IOException {
+    int blockLength = 5 * ONEMB;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    ByteBuffer readBuffer = ByteBuffer.allocate(100);
+    dataGenerator = new SplittableRandom(randomSeed);
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      // Perform one read to get the stream created
+      int read = bis.read(readBuffer);
+      Assert.assertEquals(100, read);
+
+      bis.seek(1024);
+      readBuffer.clear();
+      resetAndAdvanceDataGenerator(1024);
+      bis.read(readBuffer);
+      ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+      Assert.assertEquals(1124, bis.getPos());
+
+      // Set the non-reconstruction reader to thrown an exception on seek
+      streamFactory.getStreams().get(false).setShouldErrorOnSeek(true);
+      bis.seek(2048);
+      readBuffer.clear();
+      resetAndAdvanceDataGenerator(2048);
+      bis.read(readBuffer);
+      ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+
+      // Finally, set the recon reader to fail on seek.
+      streamFactory.getStreams().get(true).setShouldErrorOnSeek(true);
+      try {
+        bis.seek(1024);
+        Assert.fail("Seek should have raised an exception");
+      } catch (IOException e) {
+        // expected
+      }
+    }
+  }
+
+  private ByteBuffer generateData(int length) {
+    ByteBuffer data = ByteBuffer.allocate(length);
+    ECStreamTestUtil.randomFill(data, dataGenerator);
+    streamFactory.setData(data);
+    return data;
+  }
+
+  private void resetAndAdvanceDataGenerator(long position) {
+    dataGenerator = new SplittableRandom(randomSeed);
+    for (long i = 0; i < position; i++) {
+      dataGenerator.nextInt(255);
+    }
+  }
+
+  private ECBlockInputStreamProxy createBISProxy(ECReplicationConfig rConfig,
+      OmKeyLocationInfo blockInfo) {
+    return new ECBlockInputStreamProxy(
+        rConfig, blockInfo, true, null, null, streamFactory);
+  }
+
+  private static class TestECBlockInputStreamFactory
+      implements ECBlockInputStreamFactory {
+
+    private ByteBuffer data;
+
+    private Map<Boolean, ECStreamTestUtil.TestBlockInputStream> streams
+        = new HashMap<>();
+
+    private List<DatanodeDetails> failedLocations;
+
+    public void setData(ByteBuffer data) {
+      this.data = data;
+    }
+
+    public Map<Boolean, ECStreamTestUtil.TestBlockInputStream> getStreams() {
+      return streams;
+    }
+
+    public List<DatanodeDetails> getFailedLocations() {
+      return failedLocations;
+    }
+
+    @Override
+    public BlockExtendedInputStream create(boolean missingLocations,
+        List<DatanodeDetails> failedDatanodes,
+        ReplicationConfig repConfig, OmKeyLocationInfo blockInfo,
+        boolean verifyChecksum, XceiverClientFactory xceiverFactory,
+        Function<BlockID, Pipeline> refreshFunction) {
+      this.failedLocations = failedDatanodes;
+      ByteBuffer wrappedBuffer =
+          ByteBuffer.wrap(data.array(), 0, data.capacity());
+      ECStreamTestUtil.TestBlockInputStream is =
+          new ECStreamTestUtil.TestBlockInputStream(blockInfo.getBlockID(),
+              blockInfo.getLength(), wrappedBuffer);
+      streams.put(missingLocations, is);
+      return is;
+    }
+  }
+
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 875d2a7..0dda350 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -65,7 +65,7 @@ public class TestECBlockReconstructedStripeInputStream {
   }
 
   @Test
-  public void testSufficientLocations() {
+  public void testSufficientLocations() throws IOException {
     // One chunk, only 1 location.
     OmKeyLocationInfo keyInfo = ECStreamTestUtil
         .createKeyInfo(repConfig, 1, ONEMB);
@@ -94,6 +94,11 @@ public class TestECBlockReconstructedStripeInputStream {
         new ECBlockReconstructedStripeInputStream(repConfig,
         keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
       Assert.assertTrue(ecb.hasSufficientLocations());
+      // Set a failed location
+      List<DatanodeDetails> failed = new ArrayList<>();
+      failed.add(keyInfo.getPipeline().getFirstNode());
+      ((ECBlockReconstructedStripeInputStream)ecb).addFailedDatanodes(failed);
+      Assert.assertFalse(ecb.hasSufficientLocations());
     }
 
     // Three Chunks, but missing data block 2 and 3 and parity 1.
@@ -104,6 +109,25 @@ public class TestECBlockReconstructedStripeInputStream {
         keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
       Assert.assertFalse(ecb.hasSufficientLocations());
     }
+
+    // Three Chunks, all available but fail 3
+    dnMap = ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
+    try (ECBlockInputStream ecb =
+        new ECBlockReconstructedStripeInputStream(repConfig,
+        keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+      Assert.assertTrue(ecb.hasSufficientLocations());
+      // Set a failed location
+      List<DatanodeDetails> failed = new ArrayList<>();
+      for (DatanodeDetails dn : dnMap.keySet()) {
+        failed.add(dn);
+        if (failed.size() == 3) {
+          break;
+        }
+      }
+      ((ECBlockReconstructedStripeInputStream)ecb).addFailedDatanodes(failed);
+      Assert.assertFalse(ecb.hasSufficientLocations());
+    }
   }
 
   @Test
@@ -522,6 +546,55 @@ public class TestECBlockReconstructedStripeInputStream {
     }
   }
 
+  @Test
+  public void testFailedLocationsAreNotRead() throws IOException {
+    // Generate the input data for 3 full stripes and generate the parity.
+    int chunkSize = repConfig.getEcChunkSize();
+    int partialStripeSize = chunkSize * 2 - 1;
+    int blockLength = chunkSize * repConfig.getData() * 3 + partialStripeSize;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 * 
chunkSize);
+    ECStreamTestUtil.randomFill(dataBufs, chunkSize, dataGen, blockLength);
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+    streamFactory = new TestBlockInputStreamFactory();
+    addDataStreamsToFactory(dataBufs, parity);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+        stripeSize() * 3 + partialStripeSize, dnMap);
+    streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+    ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+    dataGen = new SplittableRandom(randomSeed);
+    try (ECBlockReconstructedStripeInputStream ecb =
+        new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+            null, null, streamFactory)) {
+      List<DatanodeDetails> failed = new ArrayList<>();
+      // Set the first 3 DNs as failed
+      for (Map.Entry<DatanodeDetails, Integer> e : dnMap.entrySet()) {
+        if (e.getValue() <= 2) {
+          failed.add(e.getKey());
+        }
+      }
+      ecb.addFailedDatanodes(failed);
+
+      // Read full stripe
+      int read = ecb.readStripe(bufs);
+      for (int j = 0; j < bufs.length; j++) {
+        ECStreamTestUtil.assertBufferMatches(bufs[j], dataGen);
+      }
+      Assert.assertEquals(stripeSize(), read);
+
+      // Now ensure that streams with repIndexes 1, 2 and 3 have not been
+      // created in the stream factory, indicating we did not read them.
+      List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
+      for (TestBlockInputStream stream : streams) {
+        Assert.assertTrue(stream.getEcReplicaIndex() > 2);
+      }
+    }
+  }
+
   private List<Integer> indexesToList(int... indexes) {
     List<Integer> list = new ArrayList<>();
     for (int i : indexes) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to