This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 636d8226827 HDFS-17811. EC: DFSStripedInputStream supports retrying 
just like DFSInputStream. (#7820). Contributed by hfutatzhanghb.
636d8226827 is described below

commit 636d822682715dbc05af9483e91a9f0ee72f83b8
Author: hfutatzhanghb <hfutzhan...@163.com>
AuthorDate: Wed Jul 30 20:18:54 2025 +0800

    HDFS-17811. EC: DFSStripedInputStream supports retrying just like 
DFSInputStream. (#7820). Contributed by hfutatzhanghb.
    
    Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org>
---
 .../apache/hadoop/hdfs/DFSClientFaultInjector.java |  3 +
 .../apache/hadoop/hdfs/DFSStripedInputStream.java  | 64 ++++++++++++++--------
 .../hadoop/hdfs/TestDFSStripedInputStream.java     | 62 +++++++++++++++++++++
 3 files changed, 105 insertions(+), 24 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 8150b6fb0e2..05ec2b18e95 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.classification.VisibleForTesting;
@@ -71,4 +72,6 @@ public void delayWhenRenewLeaseTimeout() {}
   public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long 
offset, long length) {}
 
   public void failCreateBlockReader() throws InvalidBlockTokenException {}
+
+  public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException 
{};
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index d6131f8ddeb..fc6161625d9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -395,37 +395,53 @@ protected synchronized int 
readWithStrategy(ReaderStrategy strategy)
       throw new IOException("Stream closed");
     }
 
+    // Number of bytes already read into buffer.
+    int result = 0;
     int len = strategy.getTargetLength();
     CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
     if (pos < getFileLength()) {
-      try {
-        if (pos > blockEnd) {
-          blockSeekTo(pos);
-        }
-        int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
-        synchronized (infoLock) {
-          if (locatedBlocks.isLastBlockComplete()) {
-            realLen = (int) Math.min(realLen,
-                locatedBlocks.getFileLength() - pos);
+      int retries = 2;
+      boolean isRetryRead = false;
+      while (retries > 0) {
+        try {
+          if (pos > blockEnd || isRetryRead) {
+            blockSeekTo(pos);
+          }
+          int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
+          synchronized (infoLock) {
+            if (locatedBlocks.isLastBlockComplete()) {
+              realLen = (int) Math.min(realLen,
+                  locatedBlocks.getFileLength() - pos);
+            }
           }
-        }
 
-        /** Number of bytes already read into buffer */
-        int result = 0;
-        while (result < realLen) {
-          if (!curStripeRange.include(getOffsetInBlockGroup())) {
-            readOneStripe(corruptedBlocks);
+          while (result < realLen) {
+            if (!curStripeRange.include(getOffsetInBlockGroup())) {
+              
DFSClientFaultInjector.get().failWhenReadWithStrategy(isRetryRead);
+              readOneStripe(corruptedBlocks);
+            }
+            int ret = copyToTargetBuf(strategy, realLen - result);
+            result += ret;
+            pos += ret;
+            len -= ret;
+          }
+          return result;
+        } catch (IOException ioe) {
+          retries--;
+          if (retries > 0) {
+            DFSClient.LOG.info(
+                "DFSStripedInputStream read meets exception:{}, will retry 
again.",
+                ioe.toString());
+            isRetryRead = true;
+          } else {
+            throw ioe;
           }
-          int ret = copyToTargetBuf(strategy, realLen - result);
-          result += ret;
-          pos += ret;
+        } finally {
+          // Check if need to report block replicas corruption either read
+          // was successful or ChecksumException occurred.
+          reportCheckSumFailure(corruptedBlocks, 
getCurrentBlockLocationsLength(),
+              true);
         }
-        return result;
-      } finally {
-        // Check if need to report block replicas corruption either read
-        // was successful or ChecksumException occurred.
-        reportCheckSumFailure(corruptedBlocks, 
getCurrentBlockLocationsLength(),
-            true);
       }
     }
     return -1;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index f9646e9ee16..60639555789 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -50,6 +52,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -735,4 +738,63 @@ public void onCreateBlockReader(LocatedBlock block, int 
chunkIndex,
     assertEquals(rangesExpected, ranges);
   }
 
+  @Test
+  public void testStatefulReadRetryWhenMoreThanParityFailOnce() throws 
Exception {
+    HdfsConfiguration hdfsConf = new HdfsConfiguration();
+    String testBaseDir = "/testECRead";
+    String testfileName = "testfile";
+    DFSClientFaultInjector old = DFSClientFaultInjector.get();
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
+        .numDataNodes(9).build()) {
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      Path dir = new Path(testBaseDir);
+      assertTrue(dfs.mkdirs(dir));
+      dfs.enableErasureCodingPolicy("RS-6-3-1024k");
+      dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k");
+      assertEquals("RS-6-3-1024k", dfs.getErasureCodingPolicy(dir).getName());
+
+      int writeBufSize = 30 * 1024 * 1024 + 1;
+      byte[] writeBuf = new byte[writeBufSize];
+      try (FSDataOutputStream fsdos = dfs.create(
+          new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
+        Random random = new Random();
+        random.nextBytes(writeBuf);
+        fsdos.write(writeBuf, 0, writeBuf.length);
+        Thread.sleep(1000);
+      }
+      FileStatus fileStatus = dfs.getFileStatus(
+          new Path(testBaseDir + Path.SEPARATOR + testfileName));
+      assertEquals(writeBufSize, fileStatus.getLen());
+
+      DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+        @Override
+        public void failWhenReadWithStrategy(boolean isRetryRead) throws 
IOException {
+          if (!isRetryRead) {
+            throw new IOException("Mock more than parity num blocks fail when 
readOneStripe.");
+          }
+        }
+      });
+
+      // We use unaligned buffer size to trigger some corner cases.
+      byte[] readBuf = new byte[4095];
+      byte[] totalReadBuf = new byte[writeBufSize]; // Buffer to store all 
read data
+      int ret = 0;
+      int totalReadBytes = 0;
+      try (FSDataInputStream fsdis = dfs.open(
+          new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
+        while((ret = fsdis.read(readBuf)) > 0) {
+          System.arraycopy(readBuf, 0, totalReadBuf, totalReadBytes, ret);
+          totalReadBytes += ret;
+        }
+
+        // Compare the read data with the original writeBuf.
+        assertEquals(writeBufSize, totalReadBytes, "Total bytes read should 
match writeBuf size");
+        assertArrayEquals(writeBuf, totalReadBuf, "Read data should match 
original write data");
+      }
+      assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + 
testfileName), true));
+    } finally {
+      DFSClientFaultInjector.set(old);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to