This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 65268cc2c1 HDDS-7918. EC: ECBlockReconstructedStripeInputStream should 
check for spare replicas before failing an index (#4441)
65268cc2c1 is described below

commit 65268cc2c1aa90086a7dba15e55a46c93962ec24
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Mar 21 16:00:12 2023 +0100

    HDDS-7918. EC: ECBlockReconstructedStripeInputStream should check for spare 
replicas before failing an index (#4441)
---
 .../ozone/client/io/BadDataLocationException.java  |  9 ++-
 .../hadoop/ozone/client/io/ECBlockInputStream.java |  4 +-
 .../io/ECBlockReconstructedStripeInputStream.java  | 26 ++++++++
 .../hadoop/ozone/client/io/ECStreamTestUtil.java   |  9 +--
 .../TestECBlockReconstructedStripeInputStream.java | 76 +++++++++++++++++++---
 5 files changed, 108 insertions(+), 16 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
index 65e124eaf5..5046a44895 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
@@ -29,7 +29,7 @@ import java.util.List;
  */
 public class BadDataLocationException extends IOException {
 
-  private List<DatanodeDetails> failedLocations = new ArrayList<>();
+  private final List<DatanodeDetails> failedLocations = new ArrayList<>();
   private int failedLocationIndex;
 
   public BadDataLocationException(DatanodeDetails dn) {
@@ -53,6 +53,13 @@ public class BadDataLocationException extends IOException {
     failedLocations.add(dn);
   }
 
+  public BadDataLocationException(int failedIndex,
+      Throwable ex, List<DatanodeDetails> failedLocations) {
+    super(ex);
+    failedLocationIndex = failedIndex;
+    this.failedLocations.addAll(failedLocations);
+  }
+
   public BadDataLocationException(DatanodeDetails dn, int failedIndex,
       Throwable ex) {
     super(ex);
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index dc354198ca..8ad8f8851e 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -334,7 +334,7 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
     }
   }
 
-  private boolean shouldRetryFailedRead(int failedIndex) {
+  protected boolean shouldRetryFailedRead(int failedIndex) {
     Deque<DatanodeDetails> spareLocations = 
spareDataLocations.get(failedIndex);
     if (spareLocations != null && spareLocations.size() > 0) {
       failedLocations.add(dataLocations[failedIndex]);
@@ -470,7 +470,7 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
     seeked = true;
   }
 
-  private void closeStream(int i) {
+  protected void closeStream(int i) {
     if (blockStreams[i] != null) {
       try {
         blockStreams[i].close();
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 9658fb784d..492bf33a10 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -42,6 +42,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 import java.util.Set;
@@ -610,6 +611,31 @@ public class ECBlockReconstructedStripeInputStream extends 
ECBlockInputStream {
   }
 
   private void readIntoBuffer(int ind, ByteBuffer buf) throws IOException {
+    List<DatanodeDetails> failedLocations = new LinkedList<>();
+    while (true) {
+      int currentBufferPosition = buf.position();
+      try {
+        readFromCurrentLocation(ind, buf);
+        break;
+      } catch (IOException e) {
+        DatanodeDetails failedLocation = getDataLocations()[ind];
+        failedLocations.add(failedLocation);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{}: read [{}] failed from {} due to {}", this,
+              ind, failedLocation, e.getMessage());
+        }
+        closeStream(ind);
+        if (shouldRetryFailedRead(ind)) {
+          buf.position(currentBufferPosition);
+        } else {
+          throw new BadDataLocationException(ind, e, failedLocations);
+        }
+      }
+    }
+  }
+
+  private void readFromCurrentLocation(int ind, ByteBuffer buf)
+      throws IOException {
     BlockExtendedInputStream stream = getOrOpenStream(ind);
     seekStreamIfNecessary(stream, 0);
     while (buf.hasRemaining()) {
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
index 0fe5886f1b..db08bd7343 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
@@ -223,7 +223,7 @@ public final class ECStreamTestUtil {
         new LinkedHashMap<>();
     private List<ByteBuffer> blockStreamData;
     // List of EC indexes that should fail immediately on read
-    private List<Integer> failIndexes = new ArrayList<>();
+    private final List<Integer> failIndexes = new ArrayList<>();
 
     private Pipeline currentPipeline;
 
@@ -249,8 +249,9 @@ public final class ECStreamTestUtil {
       this.currentPipeline = pipeline;
     }
 
-    public synchronized void setFailIndexes(List<Integer> fail) {
-      failIndexes.addAll(fail);
+    // fail each index in the list once
+    public synchronized void setFailIndexes(Integer... fail) {
+      failIndexes.addAll(Arrays.asList(fail));
     }
 
     public synchronized BlockExtendedInputStream create(
@@ -264,7 +265,7 @@ public final class ECStreamTestUtil {
       TestBlockInputStream stream = new TestBlockInputStream(
           blockInfo.getBlockID(), blockInfo.getLength(),
           blockStreamData.get(repInd - 1), repInd);
-      if (failIndexes.contains(repInd)) {
+      if (failIndexes.remove(Integer.valueOf(repInd))) {
         stream.setShouldError(true);
       }
       blockStreams.put(repInd, stream);
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
index 7ad6d3e185..62d8c2d760 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
 import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.ElasticByteBufferPool;
@@ -489,6 +490,71 @@ public class TestECBlockReconstructedStripeInputStream {
     }
   }
 
+  @Test
+  void testNoErrorIfSpareLocationToRead() throws IOException {
+    int chunkSize = repConfig.getEcChunkSize();
+    int blockLength = chunkSize * 3 - 1;
+
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+    ECStreamTestUtil
+        .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, 
blockLength);
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+    // We have a length that is less than a stripe, so chunks 1 and 2 are full.
+    // Block 1 is lost and needs recovered
+    // from the parity and padded blocks 2 and 3.
+
+    List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
+    // Two data missing
+    locations.add(ECStreamTestUtil.createIndexMap(3, 4, 5));
+    // Two data missing
+    locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
+    // One data missing - the last one
+    locations.add(ECStreamTestUtil.createIndexMap(1, 2, 5));
+    // One data and one parity missing
+    locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
+    // One data and one parity missing
+    locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4));
+    // No indexes missing
+    locations.add(ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5));
+
+    DatanodeDetails spare = MockDatanodeDetails.randomDatanodeDetails();
+
+    for (Map<DatanodeDetails, Integer> dnMap : locations) {
+      streamFactory = new TestBlockInputStreamFactory();
+      addDataStreamsToFactory(dataBufs, parity);
+      ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+
+      // this index fails, but has spare replica
+      int failing = dnMap.values().iterator().next();
+      streamFactory.setFailIndexes(failing);
+      dnMap.put(spare, failing);
+
+      BlockLocationInfo keyInfo =
+          ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+      streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+      dataGen = new SplittableRandom(randomSeed);
+      try (ECBlockReconstructedStripeInputStream ecb =
+               createInputStream(keyInfo)) {
+        int read = ecb.read(bufs);
+        Assertions.assertEquals(blockLength, read);
+        ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
+        ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
+        ECStreamTestUtil.assertBufferMatches(bufs[2], dataGen);
+        // Check the underlying streams have been advanced by 1 chunk:
+        for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+          Assertions.assertEquals(0, bis.getRemaining());
+        }
+        Assertions.assertEquals(ecb.getPos(), blockLength);
+        clearBuffers(bufs);
+        // A further read should give EOF
+        read = ecb.read(bufs);
+        Assertions.assertEquals(-1, read);
+      }
+    }
+  }
+
   @Test
   public void testSeek() throws IOException {
     // Generate the input data for 3 full stripes and generate the parity
@@ -688,7 +754,7 @@ public class TestECBlockReconstructedStripeInputStream {
     streamFactory = new TestBlockInputStreamFactory();
     addDataStreamsToFactory(dataBufs, parity);
     // Fail all the indexes containing data on their first read.
-    streamFactory.setFailIndexes(indexesToList(1, 4, 5));
+    streamFactory.setFailIndexes(1, 4, 5);
     // The locations contain the padded indexes, as will often be the case
     // when containers are reported by SCM.
     Map<DatanodeDetails, Integer> dnMap =
@@ -759,14 +825,6 @@ public class TestECBlockReconstructedStripeInputStream {
         null, null, streamFactory, bufferPool, ecReconstructExecutor);
   }
 
-  private List<Integer> indexesToList(int... indexes) {
-    List<Integer> list = new ArrayList<>();
-    for (int i : indexes) {
-      list.add(i);
-    }
-    return list;
-  }
-
   private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity) 
{
     List<ByteBuffer> dataStreams = new ArrayList<>();
     for (ByteBuffer b : data) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to