This is an automated email from the ASF dual-hosted git repository.

steveloughran pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 9e6c311a873 HADOOP-19863. Incorrect Vectored IO metrics from Local 
Filesystem. (#8447) (#8496)
9e6c311a873 is described below

commit 9e6c311a873926182963bd11c7e7962463a2ae5f
Author: Steve Loughran <[email protected]>
AuthorDate: Thu May 14 11:44:38 2026 +0100

    HADOOP-19863. Incorrect Vectored IO metrics from Local Filesystem. (#8447) 
(#8496)
    
    
    Contributed by Steve Loughran.
---
 .../org/apache/hadoop/fs/RawLocalFileSystem.java   | 39 +++++++---
 .../contract/AbstractContractVectoredReadTest.java | 11 +++
 .../localfs/TestLocalFSContractVectoredRead.java   | 90 ++++++++++++++++++----
 3 files changed, 113 insertions(+), 27 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
index 3bd93a4f459..4298e606896 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -81,6 +81,7 @@
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_BYTES;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_OPERATIONS;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_EXCEPTIONS;
 import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
@@ -158,7 +159,8 @@ class LocalFSFileInputStream extends FSInputStream 
implements
             STREAM_READ_EXCEPTIONS,
             STREAM_READ_SEEK_OPERATIONS,
             STREAM_READ_SKIP_OPERATIONS,
-            STREAM_READ_SKIP_BYTES)
+            STREAM_READ_SKIP_BYTES,
+            STREAM_READ_VECTORED_OPERATIONS)
         .build();
 
     /** Reference to the bytes read counter for slightly faster counting. */
@@ -225,8 +227,7 @@ public int read() throws IOException {
         int value = fis.read();
         if (value >= 0) {
           this.position++;
-          statistics.incrementBytesRead(1);
-          bytesRead.addAndGet(1);
+          recordBytesRead(1);
         }
         return value;
       } catch (IOException e) {                 // unexpected exception
@@ -243,8 +244,7 @@ public int read(byte[] b, int off, int len) throws 
IOException {
         int value = fis.read(b, off, len);
         if (value > 0) {
           this.position += value;
-          statistics.incrementBytesRead(value);
-          bytesRead.addAndGet(value);
+          recordBytesRead(value);
         }
         return value;
       } catch (IOException e) {                 // unexpected exception
@@ -252,7 +252,18 @@ public int read(byte[] b, int off, int len) throws 
IOException {
         throw new FSError(e);                   // assume native fs error
       }
     }
-    
+
+    /**
+     * Count the number of bytes read in fs and io statistics.
+     * @param count
+     */
+    private void recordBytesRead(final int count) {
+      if (count > 0) {
+        statistics.incrementBytesRead(count);
+        bytesRead.addAndGet(count);
+      }
+    }
+
     @Override
     public int read(long position, byte[] b, int off, int len)
       throws IOException {
@@ -266,8 +277,7 @@ public int read(long position, byte[] b, int off, int len)
       try {
         int value = fis.getChannel().read(bb, position);
         if (value > 0) {
-          statistics.incrementBytesRead(value);
-          ioStatistics.incrementCounter(STREAM_READ_BYTES, value);
+          recordBytesRead(value);
         }
         return value;
       } catch (IOException e) {
@@ -328,6 +338,7 @@ public void readVectored(List<? extends FileRange> ranges,
     public void readVectored(final List<? extends FileRange> ranges,
         final IntFunction<ByteBuffer> allocate,
         final Consumer<ByteBuffer> release) throws IOException {
+      ioStatistics.incrementCounter(STREAM_READ_VECTORED_OPERATIONS);
 
       // Validate, but do not pass in a file length as it may change.
       List<? extends FileRange> sortedRanges = sortRangeList(ranges);
@@ -341,7 +352,8 @@ public void readVectored(final List<? extends FileRange> 
ranges,
       // Initiate the asynchronous reads.
       new AsyncHandler(getAsyncChannel(),
           sortedRanges,
-          pool)
+          pool,
+          this::recordBytesRead)
           .initiateRead();
     }
   }
@@ -372,20 +384,25 @@ private static class AsyncHandler implements 
CompletionHandler<Integer, Integer>
     /** Buffers being read. */
     private final ByteBuffer[] buffers;
 
+    /* Callback to update statistics. */
+    private final Consumer<Integer> statisticsUpdater;
+
     /**
      * Instantiate.
      * @param channel open channel.
      * @param ranges ranges to read.
      * @param allocateRelease pool for allocating buffers, and releasing on 
failure
+     * @param statisticsUpdater callback to update statistics.
      */
     AsyncHandler(
         final AsynchronousFileChannel channel,
         final List<? extends FileRange> ranges,
-        final ByteBufferPool allocateRelease) {
+        final ByteBufferPool allocateRelease, final Consumer<Integer> 
statisticsUpdater) {
       this.channel = channel;
       this.ranges = ranges;
       this.buffers = new ByteBuffer[ranges.size()];
       this.allocateRelease = allocateRelease;
+      this.statisticsUpdater = statisticsUpdater;
     }
 
     /**
@@ -426,6 +443,8 @@ public void completed(Integer result, Integer rangeIndex) {
           // issue a read for the rest of the buffer
           channel.read(buffer, range.getOffset() + buffer.position(), 
rangeIndex, this);
         } else {
+          // read finished
+          statisticsUpdater.accept(range.getLength());
           // Flip the buffer and declare success.
           buffer.flip();
           range.getData().complete(buffer);
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index 248333da9c9..5a70c943320 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -206,10 +206,21 @@ public void testVectoredReadMultipleRanges() throws 
Exception {
       combinedFuture.get();
 
       validateVectoredReadResult(fileRanges, DATASET, 0);
+      assertionsWithinTestVectoredReadMultipleRanges(in, fileRanges);
       returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
+  /**
+   * Place to add some custom assertions within {@link 
#testVectoredReadMultipleRanges()}.
+   * @param in active input stream.
+   * @param fileRanges ranges of files read.
+   */
+  protected void assertionsWithinTestVectoredReadMultipleRanges(final 
FSDataInputStream in,
+      final List<FileRange> fileRanges) {
+
+  }
+
   @Test
   public void testVectoredReadAndReadFully()  throws Exception {
     List<FileRange> fileRanges = new ArrayList<>();
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
index e2a9bfb444a..ff5e4bc5ee9 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
@@ -21,31 +21,39 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedClass;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.apache.hadoop.fs.statistics.IOStatistics;
 
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assertions.assertThat;
 
-@ParameterizedClass(name="buffer-{0}")
+@ParameterizedClass(name = "buffer-{0}")
 @MethodSource("params")
 public class TestLocalFSContractVectoredRead extends 
AbstractContractVectoredReadTest {
 
+  private long initialBytesRead;
+
   public TestLocalFSContractVectoredRead(final String bufferType) {
     super(bufferType);
   }
@@ -87,28 +95,28 @@ public void 
testChecksumValidationDuringVectoredReadSmallFile() throws Exception
    * @throws Exception any exception other than ChecksumException
    */
   private void validateCheckReadException(Path testPath,
-                                          int length,
-                                          List<FileRange> ranges) throws 
Exception {
+      int length,
+      List<FileRange> ranges) throws Exception {
     LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
     final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32);
-    try (FSDataOutputStream out = localFs.create(testPath, true)){
+    try (FSDataOutputStream out = localFs.create(testPath, true)) {
       out.write(datasetCorrect);
     }
     Path checksumPath = localFs.getChecksumFile(testPath);
     Assertions.assertThat(localFs.exists(checksumPath))
-            .describedAs("Checksum file should be present")
-            .isTrue();
+        .describedAs("Checksum file should be present")
+        .isTrue();
     CompletableFuture<FSDataInputStream> fis = 
localFs.openFile(testPath).build();
-    try (FSDataInputStream in = fis.get()){
+    try (FSDataInputStream in = fis.get()) {
       in.readVectored(ranges, getAllocate());
       validateVectoredReadResult(ranges, datasetCorrect, 0);
     }
     final byte[] datasetCorrupted = ContractTestUtils.dataset(length, 'a', 64);
-    try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){
+    try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)) {
       out.write(datasetCorrupted);
     }
     CompletableFuture<FSDataInputStream> fisN = 
localFs.openFile(testPath).build();
-    try (FSDataInputStream in = fisN.get()){
+    try (FSDataInputStream in = fisN.get()) {
       in.readVectored(ranges, getAllocate());
       // Expect checksum exception when data is updated directly through
       // raw local fs instance.
@@ -123,20 +131,68 @@ public void tesChecksumVectoredReadBoundaries() throws 
Exception {
     final int length = 1071;
     LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
     final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32);
-    try (FSDataOutputStream out = localFs.create(testPath, true)){
+    try (FSDataOutputStream out = localFs.create(testPath, true)) {
       out.write(datasetCorrect);
     }
     Path checksumPath = localFs.getChecksumFile(testPath);
     Assertions.assertThat(localFs.exists(checksumPath))
-            .describedAs("Checksum file should be present at {} ", 
checksumPath)
-            .isTrue();
+        .describedAs("Checksum file should be present at {} ", checksumPath)
+        .isTrue();
     CompletableFuture<FSDataInputStream> fis = 
localFs.openFile(testPath).build();
     List<FileRange> smallRange = new ArrayList<>();
     smallRange.add(FileRange.createFileRange(1000, 71));
-    try (FSDataInputStream in = fis.get()){
+    try (FSDataInputStream in = fis.get()) {
       in.readVectored(smallRange, getAllocate());
       validateVectoredReadResult(smallRange, datasetCorrect, 0);
     }
   }
 
+  /**
+   * subclass so that the bytes read count can be cached before the test run.
+   */
+  @Test
+  @Override
+  public void testVectoredReadMultipleRanges() throws Exception {
+    initialBytesRead = getBytesRead();
+    super.testVectoredReadMultipleRanges();
+  }
+
+  /**
+   * Validate statistics.
+   * Sometimes the tests failed with more than expected read, so the 
assertions are on
+   * {@code isGreaterThanOrEqualTo()} rather than exact values.
+   */
+  @Override
+  protected void assertionsWithinTestVectoredReadMultipleRanges(
+      final FSDataInputStream in,
+      final List<FileRange> fileRanges) {
+
+    // check the iostats
+    final long totalVectorReadLength = 
fileRanges.stream().mapToLong(FileRange::getLength).sum();
+    final IOStatistics stats = in.getIOStatistics();
+    assertThatStatisticCounter(stats, STREAM_READ_VECTORED_OPERATIONS)
+        .describedAs(STREAM_READ_VECTORED_OPERATIONS + " stream %s", stats)
+        .isEqualTo(1);
+    assertThatStatisticCounter(stats, STREAM_READ_BYTES)
+        .describedAs(STREAM_READ_BYTES + " in bytes read in stream %s", stats)
+        .isGreaterThanOrEqualTo(totalVectorReadLength);
+
+    // validate filesystem stats, went up by at least that amount.
+    // expect counting of other things, crc files in particular
+    long currentBytesRead = getBytesRead();
+    assertThat(currentBytesRead)
+        .describedAs("bytes read in stream %s", in)
+        .isGreaterThanOrEqualTo(initialBytesRead + totalVectorReadLength);
+  }
+
+  /**
+   * API is deprecated, but Spark uses it, and it's how the regression was 
found.
+   * this is how the production code looks at our stats.
+   * @return counter of bytes read across all stores. Never reset.
+   */
+  private static long getBytesRead() {
+    AtomicLong bytes = new AtomicLong();
+    FileSystem.getAllStatistics().forEach(st -> 
bytes.addAndGet(st.getBytesRead()));
+    return bytes.get();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to