This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 636d8226827 HDFS-17811. EC: DFSStripedInputStream supports retrying just like DFSInputStream. (#7820). Contributed by hfutatzhanghb. 636d8226827 is described below commit 636d822682715dbc05af9483e91a9f0ee72f83b8 Author: hfutatzhanghb <hfutzhan...@163.com> AuthorDate: Wed Jul 30 20:18:54 2025 +0800 HDFS-17811. EC: DFSStripedInputStream supports retrying just like DFSInputStream. (#7820). Contributed by hfutatzhanghb. Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org> --- .../apache/hadoop/hdfs/DFSClientFaultInjector.java | 3 + .../apache/hadoop/hdfs/DFSStripedInputStream.java | 64 ++++++++++++++-------- .../hadoop/hdfs/TestDFSStripedInputStream.java | 62 +++++++++++++++++++++ 3 files changed, 105 insertions(+), 24 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index 8150b6fb0e2..05ec2b18e95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.VisibleForTesting; @@ -71,4 +72,6 @@ public void delayWhenRenewLeaseTimeout() {} public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, long length) {} public void failCreateBlockReader() throws InvalidBlockTokenException {} + + public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException {}; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index d6131f8ddeb..fc6161625d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -395,37 +395,53 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy) throw new IOException("Stream closed"); } + // Number of bytes already read into buffer. + int result = 0; int len = strategy.getTargetLength(); CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); if (pos < getFileLength()) { - try { - if (pos > blockEnd) { - blockSeekTo(pos); - } - int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); - synchronized (infoLock) { - if (locatedBlocks.isLastBlockComplete()) { - realLen = (int) Math.min(realLen, - locatedBlocks.getFileLength() - pos); + int retries = 2; + boolean isRetryRead = false; + while (retries > 0) { + try { + if (pos > blockEnd || isRetryRead) { + blockSeekTo(pos); + } + int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); + synchronized (infoLock) { + if (locatedBlocks.isLastBlockComplete()) { + realLen = (int) Math.min(realLen, + locatedBlocks.getFileLength() - pos); + } } - } - /** Number of bytes already read into buffer */ - int result = 0; - while (result < realLen) { - if (!curStripeRange.include(getOffsetInBlockGroup())) { - readOneStripe(corruptedBlocks); + while (result < realLen) { + if (!curStripeRange.include(getOffsetInBlockGroup())) { + DFSClientFaultInjector.get().failWhenReadWithStrategy(isRetryRead); + readOneStripe(corruptedBlocks); + } + int ret = copyToTargetBuf(strategy, realLen - result); + result += ret; + pos += ret; + len -= ret; + } + return result; + } catch (IOException ioe) { + retries--; + if (retries > 0) { + DFSClient.LOG.info( + "DFSStripedInputStream read meets exception:{}, will retry again.", + ioe.toString()); + isRetryRead = true; + } else { + throw ioe; } - int ret = copyToTargetBuf(strategy, realLen - result); - result += ret; - pos += ret; + } finally { + // Check if need to report block replicas corruption either read + // was successful or ChecksumException occurred. + reportCheckSumFailure(corruptedBlocks, getCurrentBlockLocationsLength(), + true); } - return result; - } finally { - // Check if need to report block replicas corruption either read - // was successful or ChecksumException occurred. - reportCheckSumFailure(corruptedBlocks, getCurrentBlockLocationsLength(), - true); } } return -1; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index f9646e9ee16..60639555789 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -50,6 +52,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Random; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; @@ -735,4 +738,63 @@ public void onCreateBlockReader(LocatedBlock block, int chunkIndex, assertEquals(rangesExpected, ranges); } + @Test + public void testStatefulReadRetryWhenMoreThanParityFailOnce() throws Exception { + HdfsConfiguration hdfsConf = new HdfsConfiguration(); + String testBaseDir = "/testECRead"; + String testfileName = "testfile"; + DFSClientFaultInjector old = DFSClientFaultInjector.get(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf) + .numDataNodes(9).build()) { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + Path dir = new Path(testBaseDir); + assertTrue(dfs.mkdirs(dir)); + dfs.enableErasureCodingPolicy("RS-6-3-1024k"); + dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k"); + assertEquals("RS-6-3-1024k", dfs.getErasureCodingPolicy(dir).getName()); + + int writeBufSize = 30 * 1024 * 1024 + 1; + byte[] writeBuf = new byte[writeBufSize]; + try (FSDataOutputStream fsdos = dfs.create( + new Path(testBaseDir + Path.SEPARATOR + testfileName))) { + Random random = new Random(); + random.nextBytes(writeBuf); + fsdos.write(writeBuf, 0, writeBuf.length); + Thread.sleep(1000); + } + FileStatus fileStatus = dfs.getFileStatus( + new Path(testBaseDir + Path.SEPARATOR + testfileName)); + assertEquals(writeBufSize, fileStatus.getLen()); + + DFSClientFaultInjector.set(new DFSClientFaultInjector() { + @Override + public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException { + if (!isRetryRead) { + throw new IOException("Mock more than parity num blocks fail when readOneStripe."); + } + } + }); + + // We use unaligned buffer size to trigger some corner cases. + byte[] readBuf = new byte[4095]; + byte[] totalReadBuf = new byte[writeBufSize]; // Buffer to store all read data + int ret = 0; + int totalReadBytes = 0; + try (FSDataInputStream fsdis = dfs.open( + new Path(testBaseDir + Path.SEPARATOR + testfileName))) { + while((ret = fsdis.read(readBuf)) > 0) { + System.arraycopy(readBuf, 0, totalReadBuf, totalReadBytes, ret); + totalReadBytes += ret; + } + + // Compare the read data with the original writeBuf. + assertEquals(writeBufSize, totalReadBytes, "Total bytes read should match writeBuf size"); + assertArrayEquals(writeBuf, totalReadBuf, "Read data should match original write data"); + } + assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true)); + } finally { + DFSClientFaultInjector.set(old); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org