umamaheswararao commented on a change in pull request #2889:
URL: https://github.com/apache/ozone/pull/2889#discussion_r763641894



##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Top level class used to read data from EC Encoded blocks. This class 
decides,
+ * based on the block availability, whether to use a reconstruction or non
+ * reconstruction read and also handles errors from the non-reconstruction 
reads
+ * failing over to a reconstruction read when they happen.
+ */
+public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
+
+  private final ECReplicationConfig repConfig;
+  private final boolean verifyChecksum;
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+  private final OmKeyLocationInfo blockInfo;
+  private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
+
+  private BlockExtendedInputStream blockReader;
+  private boolean reconstructionReader = false;
+  private List<DatanodeDetails> failedLocations = new ArrayList<>();
+
+  /**
+   * Given the ECReplicationConfig and the block length, calculate how many
+   * data locations the block should have.
+   * @param repConfig The EC Replication Config
+   * @param blockLength The length of the data block in bytes
+   * @return The number of expected data locations
+   */
+  public static int expectedDataLocations(ECReplicationConfig repConfig,
+      long blockLength) {
+    return (int)Math.min(
+        Math.ceil((double)blockLength / repConfig.getEcChunkSize()),
+        repConfig.getData());
+  }
+
+  /**
+   * From ECReplicationConfig and Pipeline with the block locations and 
location
+   * indexes, determine the number of data locations available.
+   * @param repConfig The EC Replication Config
+   * @param pipeline The pipeline for the data block, givings its locations and
+   *                 the index of each location.
+   * @return The number of locations available
+   */
+  public static int availableDataLocations(ECReplicationConfig repConfig,
+      Pipeline pipeline) {
+    Set<Integer> locations = new HashSet<>();
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      int index = pipeline.getReplicaIndex(dn);
+      if (index > 0 && index <= repConfig.getData()) {
+        locations.add(index);
+      }
+    }
+    return locations.size();
+  }
+
+  public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory, Function<BlockID,
+      Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
+    this.repConfig = repConfig;
+    this.verifyChecksum = verifyChecksum;
+    this.blockInfo = blockInfo;
+    this.ecBlockInputStreamFactory = streamFactory;
+    this.xceiverClientFactory = xceiverClientFactory;
+    this.refreshFunction = refreshFunction;
+
+    setReaderType();
+    createBlockReader();
+  }
+
+  private synchronized void setReaderType() {
+    int expected = expectedDataLocations(repConfig, getLength());
+    int available = availableDataLocations(repConfig, blockInfo.getPipeline());
+    reconstructionReader = available < expected;
+  }
+
+  private void createBlockReader() {
+    blockReader = ecBlockInputStreamFactory.create(reconstructionReader,
+        failedLocations, repConfig, blockInfo, verifyChecksum,
+        xceiverClientFactory, refreshFunction);
+  }
+
+  @Override
+  public synchronized BlockID getBlockID() {
+    return blockInfo.getBlockID();
+  }
+
+  @Override
+  public synchronized long getRemaining() {
+    return blockReader.getRemaining();
+  }
+
+  @Override
+  public synchronized long getLength() {
+    return blockInfo.getLength();
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len)
+      throws IOException {
+    return read(ByteBuffer.wrap(b, off, len));
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    if (blockReader.getRemaining() == 0) {
+      return EOF;
+    }
+    int totalRead = 0;
+    long lastPosition = 0;
+    try {
+      while (buf.hasRemaining() && getRemaining() > 0) {
+        buf.mark();
+        lastPosition = blockReader.getPos();
+        totalRead += blockReader.read(buf);
+      }
+    } catch (IOException e) {
+      if (reconstructionReader) {
+        // If we get an error from the reconstruction reader, there
+        // is nothing left to try. It will re-try until it has insufficient
+        // locations internally, so if an error comes here, just re-throw it.
+        throw e;
+      }
+      if (e instanceof BadDataLocationException) {

Review comment:
       I think a log message would be better here that we switched to 
reconstruction reader due to failure. Not sure we have logged that in the down 
layer already.

##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Top level class used to read data from EC Encoded blocks. This class 
decides,
+ * based on the block availability, whether to use a reconstruction or non
+ * reconstruction read and also handles errors from the non-reconstruction 
reads
+ * failing over to a reconstruction read when they happen.
+ */
+public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
+
+  private final ECReplicationConfig repConfig;
+  private final boolean verifyChecksum;
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+  private final OmKeyLocationInfo blockInfo;
+  private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
+
+  private BlockExtendedInputStream blockReader;
+  private boolean reconstructionReader = false;
+  private List<DatanodeDetails> failedLocations = new ArrayList<>();
+
+  /**
+   * Given the ECReplicationConfig and the block length, calculate how many
+   * data locations the block should have.
+   * @param repConfig The EC Replication Config
+   * @param blockLength The length of the data block in bytes
+   * @return The number of expected data locations
+   */
+  public static int expectedDataLocations(ECReplicationConfig repConfig,
+      long blockLength) {
+    return (int)Math.min(
+        Math.ceil((double)blockLength / repConfig.getEcChunkSize()),
+        repConfig.getData());
+  }
+
+  /**
+   * From ECReplicationConfig and Pipeline with the block locations and 
location
+   * indexes, determine the number of data locations available.
+   * @param repConfig The EC Replication Config
+   * @param pipeline The pipeline for the data block, givings its locations and
+   *                 the index of each location.
+   * @return The number of locations available
+   */
+  public static int availableDataLocations(ECReplicationConfig repConfig,
+      Pipeline pipeline) {
+    Set<Integer> locations = new HashSet<>();
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      int index = pipeline.getReplicaIndex(dn);
+      if (index > 0 && index <= repConfig.getData()) {
+        locations.add(index);
+      }
+    }
+    return locations.size();
+  }
+
+  public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory, Function<BlockID,
+      Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
+    this.repConfig = repConfig;
+    this.verifyChecksum = verifyChecksum;
+    this.blockInfo = blockInfo;
+    this.ecBlockInputStreamFactory = streamFactory;
+    this.xceiverClientFactory = xceiverClientFactory;
+    this.refreshFunction = refreshFunction;
+
+    setReaderType();
+    createBlockReader();
+  }
+
+  private synchronized void setReaderType() {
+    int expected = expectedDataLocations(repConfig, getLength());
+    int available = availableDataLocations(repConfig, blockInfo.getPipeline());
+    reconstructionReader = available < expected;
+  }
+
+  private void createBlockReader() {
+    blockReader = ecBlockInputStreamFactory.create(reconstructionReader,
+        failedLocations, repConfig, blockInfo, verifyChecksum,
+        xceiverClientFactory, refreshFunction);
+  }
+
+  @Override
+  public synchronized BlockID getBlockID() {
+    return blockInfo.getBlockID();
+  }
+
+  @Override
+  public synchronized long getRemaining() {
+    return blockReader.getRemaining();
+  }
+
+  @Override
+  public synchronized long getLength() {
+    return blockInfo.getLength();
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len)
+      throws IOException {
+    return read(ByteBuffer.wrap(b, off, len));
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    if (blockReader.getRemaining() == 0) {
+      return EOF;
+    }
+    int totalRead = 0;
+    long lastPosition = 0;
+    try {
+      while (buf.hasRemaining() && getRemaining() > 0) {
+        buf.mark();
+        lastPosition = blockReader.getPos();
+        totalRead += blockReader.read(buf);
+      }
+    } catch (IOException e) {
+      if (reconstructionReader) {
+        // If we get an error from the reconstruction reader, there
+        // is nothing left to try. It will re-try until it has insufficient
+        // locations internally, so if an error comes here, just re-throw it.
+        throw e;
+      }
+      if (e instanceof BadDataLocationException) {
+        failoverToReconstructionRead(
+            ((BadDataLocationException) e).getFailedLocation(), lastPosition);
+        buf.reset();
+        totalRead += read(buf);

Review comment:
       Looks like we are calling recursive here. Is there any possibility of 
getting this read method throws EOF? If so, we will corrupt the prior read 
length and wrongly return the length. I tried to track this, looks like it may 
not be possible as we are throwing EOF only at the top of read methods. But I 
would like you to re-check one more time in this aspect. ( example: in the 
corrupt bock case we are throwing IOE and here reconstruction read can throw 
EOF?)

##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Interface used by factories which create ECBlockInput streams for
+ * reconstruction or non-reconstruction reads.
+ */
+public interface ECBlockInputStreamFactory {
+
+  /**
+   * Create a new EC InputStream based on the missingLocations boolean. If it 
is
+   * set to false, it indicates all locations are available and aa

Review comment:
       aa --> an

##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -256,11 +254,16 @@ protected synchronized int 
readWithStrategy(ByteReaderStrategy strategy)
 
     int totalRead = 0;
     while(strategy.getTargetLength() > 0 && remaining() > 0) {
-      int currentIndex = currentStreamIndex();
-      BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
-      int read = readFromStream(stream, strategy);
-      totalRead += read;
-      position += read;
+      try {
+        int currentIndex = currentStreamIndex();
+        BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
+        int read = readFromStream(stream, strategy);
+        totalRead += read;
+        position += read;
+      } catch (IOException ioe) {

Review comment:
       When readFromStream throws the following exception, we don't need to 
throw BadLocationException I guess right?
   ```
     if (actualRead == -1) {
         // The Block Stream reached EOF, but we did not expect it to, so the 
block
         // might be corrupt.
         throw new IOException("Expected to read " + expectedRead + " but got 
EOF"
             + " from blockGroup " + stream.getBlockID() + " index "
             + currentStreamIndex()+1);
       }
   ```
   But it may be hard to differentiate unless we create other type of exception.

##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Top level class used to read data from EC Encoded blocks. This class 
decides,
+ * based on the block availability, whether to use a reconstruction or non
+ * reconstruction read and also handles errors from the non-reconstruction 
reads
+ * failing over to a reconstruction read when they happen.
+ */
+public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
+
+  private final ECReplicationConfig repConfig;
+  private final boolean verifyChecksum;
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+  private final OmKeyLocationInfo blockInfo;
+  private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
+
+  private BlockExtendedInputStream blockReader;
+  private boolean reconstructionReader = false;
+  private List<DatanodeDetails> failedLocations = new ArrayList<>();
+
+  /**
+   * Given the ECReplicationConfig and the block length, calculate how many
+   * data locations the block should have.
+   * @param repConfig The EC Replication Config
+   * @param blockLength The length of the data block in bytes
+   * @return The number of expected data locations
+   */

Review comment:
       I think this method present in ECBlockInputStream class.   
   
   ```
   protected int calculateExpectedDataBlocks(ECReplicationConfig rConfig) {
       return (int)Math.min(Math.ceil(
           (double)getBlockInfo().getLength() / rConfig.getEcChunkSize()),
           rConfig.getData());
     }
   ```
   
   May be a good idea to start using Util class to move this repeated methods?

##########
File path: 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
##########
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.ozone.client.io.BadDataLocationException;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SplittableRandom;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+
+/**
+ * Unit tests for the  ECBlockInputStreamProxy class.
+ */
+public class TestECBlockInputStreamProxy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestECBlockInputStreamProxy.class);
+
+  private static final int ONEMB = 1024 * 1024;
+  private ECReplicationConfig repConfig;
+  private TestECBlockInputStreamFactory streamFactory;
+
+  private long randomSeed;
+  private ThreadLocalRandom random = ThreadLocalRandom.current();
+  private SplittableRandom dataGenerator;
+
+  @Before
+  public void setup() {
+    repConfig = new ECReplicationConfig(3, 2);
+    streamFactory = new TestECBlockInputStreamFactory();
+    randomSeed = random.nextLong();
+    dataGenerator = new SplittableRandom(randomSeed);
+  }
+
+  @Test
+  public void testExpectedDataLocations() {
+    Assert.assertEquals(1,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 1));
+    Assert.assertEquals(2,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, ONEMB + 1));
+    Assert.assertEquals(3,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 3 * ONEMB));
+    Assert.assertEquals(3,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 10 * ONEMB));
+
+    repConfig = new ECReplicationConfig(6, 3);
+    Assert.assertEquals(1,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 1));
+    Assert.assertEquals(2,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, ONEMB + 1));
+    Assert.assertEquals(3,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 3 * ONEMB));
+    Assert.assertEquals(6,
+        ECBlockInputStreamProxy.expectedDataLocations(repConfig, 10 * ONEMB));
+  }
+
+  @Test
+  public void testAvailableDataLocations() {
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
+    Assert.assertEquals(2, ECBlockInputStreamProxy.availableDataLocations(
+        repConfig, blockInfo.getPipeline()));
+
+    dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
+    blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
+    Assert.assertEquals(1, ECBlockInputStreamProxy.availableDataLocations(
+        repConfig, blockInfo.getPipeline()));
+
+    dnMap = ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
+    Assert.assertEquals(3, ECBlockInputStreamProxy.availableDataLocations(
+        repConfig, blockInfo.getPipeline()));
+  }
+
+  @Test
+  public void testBlockIDCanBeRetrieved() throws IOException {
+    int blockLength = 1234;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      Assert.assertEquals(blockInfo.getBlockID(), bis.getBlockID());
+    }
+  }
+
+  @Test
+  public void testBlockLengthCanBeRetrieved() throws IOException {
+    int blockLength = 1234;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      Assert.assertEquals(1234, bis.getLength());
+    }
+  }
+
+  @Test
+  public void testBlockRemainingCanBeRetrieved() throws IOException {
+    int blockLength = 12345;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    dataGenerator = new SplittableRandom(randomSeed);
+    ByteBuffer readBuffer = ByteBuffer.allocate(100);
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      Assert.assertEquals(12345, bis.getRemaining());
+      Assert.assertEquals(0, bis.getPos());
+      bis.read(readBuffer);
+      Assert.assertEquals(12345 - 100, bis.getRemaining());
+      Assert.assertEquals(100, bis.getPos());
+    }
+  }
+
+  @Test
+  public void testCorrectStreamCreatedDependingOnDataLocations()
+      throws IOException {
+    int blockLength = 5 * ONEMB;
+    ByteBuffer data = generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      // Not all locations present, so we expect on;y the "missing=true" stream
+      // to be present.
+      Assert.assertTrue(streamFactory.getStreams().containsKey(false));
+      Assert.assertFalse(streamFactory.getStreams().containsKey(true));
+    }
+
+    streamFactory = new TestECBlockInputStreamFactory();
+    streamFactory.setData(data);
+    dnMap = ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
+    blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      // Not all locations present, so we expect on;y the "missing=true" stream
+      // to be present.
+      Assert.assertFalse(streamFactory.getStreams().containsKey(false));
+      Assert.assertTrue(streamFactory.getStreams().containsKey(true));
+    }
+  }
+
+  @Test
+  public void testCanReadNonReconstructionToEOF()
+      throws IOException {
+    int blockLength = 5 * ONEMB;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    ByteBuffer readBuffer = ByteBuffer.allocate(100);
+    dataGenerator = new SplittableRandom(randomSeed);
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      while(true) {
+        int read = bis.read(readBuffer);
+        ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+        readBuffer.clear();
+        if (read < 100) {
+          break;
+        }
+      }
+      readBuffer.clear();
+      int read = bis.read(readBuffer);
+      Assert.assertEquals(-1, read);
+    }
+  }
+
+  @Test
+  public void testCanReadReconstructionToEOF()
+      throws IOException {
+    int blockLength = 5 * ONEMB;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    ByteBuffer readBuffer = ByteBuffer.allocate(100);
+    dataGenerator = new SplittableRandom(randomSeed);
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      while(true) {
+        int read = bis.read(readBuffer);
+        ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+        readBuffer.clear();
+        if (read < 100) {
+          break;
+        }
+      }
+      readBuffer.clear();
+      int read = bis.read(readBuffer);
+      Assert.assertEquals(-1, read);
+    }
+  }
+
+  @Test
+  public void testCanHandleErrorAndFailOverToReconstruction()
+      throws IOException {
+    int blockLength = 5 * ONEMB;
+    generateData(blockLength);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+    OmKeyLocationInfo blockInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+    ByteBuffer readBuffer = ByteBuffer.allocate(100);
+    DatanodeDetails badDN = blockInfo.getPipeline().getFirstNode();
+
+    dataGenerator = new SplittableRandom(randomSeed);
+    try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+      // Perform one read to get the stream created
+      int read = bis.read(readBuffer);
+      Assert.assertEquals(100, read);
+      ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+      // Setup an error to be thrown part through a read, so the dataBuffer
+      // will have been advanced by 50 bytes before the error. This tests it
+      // correctly rewinds and the same data is loaded again from the other
+      // stream.
+      streamFactory.getStreams().get(false).setShouldError(true, 151,
+          new BadDataLocationException(badDN, "Simulated Error"));
+      while(true) {
+        readBuffer.clear();
+        read = bis.read(readBuffer);
+        ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+        if (read < 100) {
+          break;
+        }
+      }
+      readBuffer.clear();
+      read = bis.read(readBuffer);
+      Assert.assertEquals(-1, read);
+      // Ensure the bad location was passed into the factory to create the
+      // reconstruction reader
+      Assert.assertEquals(badDN, streamFactory.getFailedLocations().get(0));
+    }
+  }
+
+  @Test
+  public void testCanSeekToNewPosition()

Review comment:
       Just format this method signature.

##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Factory class to create various BlockStream instances.
+ */
+public final class ECBlockInputStreamFactoryImpl implements
+    ECBlockInputStreamFactory {
+
+  private final BlockInputStreamFactory inputStreamFactory;
+
+  public static ECBlockInputStreamFactory getInstance(
+      BlockInputStreamFactory streamFactory) {
+    return new ECBlockInputStreamFactoryImpl(streamFactory);
+  }
+
+  private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory) 
{
+    this.inputStreamFactory = streamFactory;
+  }
+
+  /**
+   * Create a new EC InputStream based on the missingLocations boolean. If it 
is
+   * set to false, it indicates all locations are available and aa

Review comment:
       aa --> an

##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Top level class used to read data from EC Encoded blocks. This class 
decides,
+ * based on the block availability, whether to use a reconstruction or non
+ * reconstruction read and also handles errors from the non-reconstruction 
reads
+ * failing over to a reconstruction read when they happen.
+ */
+public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
+
+  private final ECReplicationConfig repConfig;
+  private final boolean verifyChecksum;
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+  private final OmKeyLocationInfo blockInfo;
+  private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
+
+  private BlockExtendedInputStream blockReader;
+  private boolean reconstructionReader = false;
+  private List<DatanodeDetails> failedLocations = new ArrayList<>();
+
+  /**
+   * Given the ECReplicationConfig and the block length, calculate how many
+   * data locations the block should have.
+   * @param repConfig The EC Replication Config
+   * @param blockLength The length of the data block in bytes
+   * @return The number of expected data locations
+   */
+  public static int expectedDataLocations(ECReplicationConfig repConfig,
+      long blockLength) {
+    return (int)Math.min(
+        Math.ceil((double)blockLength / repConfig.getEcChunkSize()),
+        repConfig.getData());
+  }
+
+  /**
+   * From ECReplicationConfig and Pipeline with the block locations and 
location
+   * indexes, determine the number of data locations available.
+   * @param repConfig The EC Replication Config
+   * @param pipeline The pipeline for the data block, givings its locations and
+   *                 the index of each location.
+   * @return The number of locations available
+   */
+  public static int availableDataLocations(ECReplicationConfig repConfig,
+      Pipeline pipeline) {
+    Set<Integer> locations = new HashSet<>();
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      int index = pipeline.getReplicaIndex(dn);
+      if (index > 0 && index <= repConfig.getData()) {
+        locations.add(index);
+      }
+    }
+    return locations.size();
+  }
+
+  public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory, Function<BlockID,
+      Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
+    this.repConfig = repConfig;
+    this.verifyChecksum = verifyChecksum;
+    this.blockInfo = blockInfo;
+    this.ecBlockInputStreamFactory = streamFactory;
+    this.xceiverClientFactory = xceiverClientFactory;
+    this.refreshFunction = refreshFunction;
+
+    setReaderType();
+    createBlockReader();
+  }
+
+  private synchronized void setReaderType() {
+    int expected = expectedDataLocations(repConfig, getLength());
+    int available = availableDataLocations(repConfig, blockInfo.getPipeline());
+    reconstructionReader = available < expected;
+  }
+
+  private void createBlockReader() {
+    blockReader = ecBlockInputStreamFactory.create(reconstructionReader,
+        failedLocations, repConfig, blockInfo, verifyChecksum,
+        xceiverClientFactory, refreshFunction);
+  }
+
+  @Override
+  public synchronized BlockID getBlockID() {
+    return blockInfo.getBlockID();
+  }
+
+  @Override
+  public synchronized long getRemaining() {
+    return blockReader.getRemaining();
+  }
+
+  @Override
+  public synchronized long getLength() {
+    return blockInfo.getLength();
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len)
+      throws IOException {
+    return read(ByteBuffer.wrap(b, off, len));
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    if (blockReader.getRemaining() == 0) {
+      return EOF;
+    }
+    int totalRead = 0;
+    long lastPosition = 0;
+    try {
+      while (buf.hasRemaining() && getRemaining() > 0) {
+        buf.mark();
+        lastPosition = blockReader.getPos();
+        totalRead += blockReader.read(buf);
+      }
+    } catch (IOException e) {
+      if (reconstructionReader) {
+        // If we get an error from the reconstruction reader, there
+        // is nothing left to try. It will re-try until it has insufficient
+        // locations internally, so if an error comes here, just re-throw it.
+        throw e;
+      }
+      if (e instanceof BadDataLocationException) {
+        failoverToReconstructionRead(
+            ((BadDataLocationException) e).getFailedLocation(), lastPosition);
+        buf.reset();
+        totalRead += read(buf);
+      } else {
+        throw e;
+      }
+    }
+    return totalRead;
+  }
+
+  private synchronized void failoverToReconstructionRead(
+      DatanodeDetails badLocation, long lastPosition) throws IOException {
+    if (badLocation != null) {
+      failedLocations.add(badLocation);
+    }
+    blockReader.close();
+    reconstructionReader = true;

Review comment:
       I am just wondering how about passing the type bool flag as argument to 
createBlockReader method? And we can save reconstructionReader in 
createBlockReader. Currently we are setting in two places, one is at ctor and 
other from exception block. Not a concern but bool flag modification can be in 
single place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to