This is an automated email from the ASF dual-hosted git repository.
steveloughran pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 734dd8a67cd HADOOP-19863. Incorrect Vectored IO metrics from Local
Filesystem. (#8447)
734dd8a67cd is described below
commit 734dd8a67cd6df56b59ff75aa43de57834a0d248
Author: Steve Loughran <[email protected]>
AuthorDate: Wed May 13 19:15:06 2026 +0100
HADOOP-19863. Incorrect Vectored IO metrics from Local Filesystem. (#8447)
Contributed by Steve Loughran.
---
.../org/apache/hadoop/fs/RawLocalFileSystem.java | 39 +++++++---
.../contract/AbstractContractVectoredReadTest.java | 11 +++
.../localfs/TestLocalFSContractVectoredRead.java | 90 ++++++++++++++++++----
3 files changed, 113 insertions(+), 27 deletions(-)
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
index 3bd93a4f459..4298e606896 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -81,6 +81,7 @@
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_BYTES;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_OPERATIONS;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_EXCEPTIONS;
import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
@@ -158,7 +159,8 @@ class LocalFSFileInputStream extends FSInputStream
implements
STREAM_READ_EXCEPTIONS,
STREAM_READ_SEEK_OPERATIONS,
STREAM_READ_SKIP_OPERATIONS,
- STREAM_READ_SKIP_BYTES)
+ STREAM_READ_SKIP_BYTES,
+ STREAM_READ_VECTORED_OPERATIONS)
.build();
/** Reference to the bytes read counter for slightly faster counting. */
@@ -225,8 +227,7 @@ public int read() throws IOException {
int value = fis.read();
if (value >= 0) {
this.position++;
- statistics.incrementBytesRead(1);
- bytesRead.addAndGet(1);
+ recordBytesRead(1);
}
return value;
} catch (IOException e) { // unexpected exception
@@ -243,8 +244,7 @@ public int read(byte[] b, int off, int len) throws
IOException {
int value = fis.read(b, off, len);
if (value > 0) {
this.position += value;
- statistics.incrementBytesRead(value);
- bytesRead.addAndGet(value);
+ recordBytesRead(value);
}
return value;
} catch (IOException e) { // unexpected exception
@@ -252,7 +252,18 @@ public int read(byte[] b, int off, int len) throws
IOException {
throw new FSError(e); // assume native fs error
}
}
-
+
+ /**
+ * Count the number of bytes read in fs and io statistics.
+ * @param count
+ */
+ private void recordBytesRead(final int count) {
+ if (count > 0) {
+ statistics.incrementBytesRead(count);
+ bytesRead.addAndGet(count);
+ }
+ }
+
@Override
public int read(long position, byte[] b, int off, int len)
throws IOException {
@@ -266,8 +277,7 @@ public int read(long position, byte[] b, int off, int len)
try {
int value = fis.getChannel().read(bb, position);
if (value > 0) {
- statistics.incrementBytesRead(value);
- ioStatistics.incrementCounter(STREAM_READ_BYTES, value);
+ recordBytesRead(value);
}
return value;
} catch (IOException e) {
@@ -328,6 +338,7 @@ public void readVectored(List<? extends FileRange> ranges,
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
+ ioStatistics.incrementCounter(STREAM_READ_VECTORED_OPERATIONS);
// Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
@@ -341,7 +352,8 @@ public void readVectored(final List<? extends FileRange>
ranges,
// Initiate the asynchronous reads.
new AsyncHandler(getAsyncChannel(),
sortedRanges,
- pool)
+ pool,
+ this::recordBytesRead)
.initiateRead();
}
}
@@ -372,20 +384,25 @@ private static class AsyncHandler implements
CompletionHandler<Integer, Integer>
/** Buffers being read. */
private final ByteBuffer[] buffers;
+ /* Callback to update statistics. */
+ private final Consumer<Integer> statisticsUpdater;
+
/**
* Instantiate.
* @param channel open channel.
* @param ranges ranges to read.
* @param allocateRelease pool for allocating buffers, and releasing on
failure
+ * @param statisticsUpdater callback to update statistics.
*/
AsyncHandler(
final AsynchronousFileChannel channel,
final List<? extends FileRange> ranges,
- final ByteBufferPool allocateRelease) {
+ final ByteBufferPool allocateRelease, final Consumer<Integer>
statisticsUpdater) {
this.channel = channel;
this.ranges = ranges;
this.buffers = new ByteBuffer[ranges.size()];
this.allocateRelease = allocateRelease;
+ this.statisticsUpdater = statisticsUpdater;
}
/**
@@ -426,6 +443,8 @@ public void completed(Integer result, Integer rangeIndex) {
// issue a read for the rest of the buffer
channel.read(buffer, range.getOffset() + buffer.position(),
rangeIndex, this);
} else {
+ // read finished
+ statisticsUpdater.accept(range.getLength());
// Flip the buffer and declare success.
buffer.flip();
range.getData().complete(buffer);
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index 248333da9c9..5a70c943320 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -206,10 +206,21 @@ public void testVectoredReadMultipleRanges() throws
Exception {
combinedFuture.get();
validateVectoredReadResult(fileRanges, DATASET, 0);
+ assertionsWithinTestVectoredReadMultipleRanges(in, fileRanges);
returnBuffersToPoolPostRead(fileRanges, pool);
}
}
+ /**
+ * Place to add some custom assertions within {@link
#testVectoredReadMultipleRanges()}.
+ * @param in active input stream.
+ * @param fileRanges ranges of files read.
+ */
+ protected void assertionsWithinTestVectoredReadMultipleRanges(final
FSDataInputStream in,
+ final List<FileRange> fileRanges) {
+
+ }
+
@Test
public void testVectoredReadAndReadFully() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
index e2a9bfb444a..ff5e4bc5ee9 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
@@ -21,31 +21,39 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.MethodSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedClass;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.apache.hadoop.fs.statistics.IOStatistics;
import static
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assertions.assertThat;
-@ParameterizedClass(name="buffer-{0}")
+@ParameterizedClass(name = "buffer-{0}")
@MethodSource("params")
public class TestLocalFSContractVectoredRead extends
AbstractContractVectoredReadTest {
+ private long initialBytesRead;
+
public TestLocalFSContractVectoredRead(final String bufferType) {
super(bufferType);
}
@@ -87,28 +95,28 @@ public void
testChecksumValidationDuringVectoredReadSmallFile() throws Exception
* @throws Exception any exception other than ChecksumException
*/
private void validateCheckReadException(Path testPath,
- int length,
- List<FileRange> ranges) throws
Exception {
+ int length,
+ List<FileRange> ranges) throws Exception {
LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32);
- try (FSDataOutputStream out = localFs.create(testPath, true)){
+ try (FSDataOutputStream out = localFs.create(testPath, true)) {
out.write(datasetCorrect);
}
Path checksumPath = localFs.getChecksumFile(testPath);
Assertions.assertThat(localFs.exists(checksumPath))
- .describedAs("Checksum file should be present")
- .isTrue();
+ .describedAs("Checksum file should be present")
+ .isTrue();
CompletableFuture<FSDataInputStream> fis =
localFs.openFile(testPath).build();
- try (FSDataInputStream in = fis.get()){
+ try (FSDataInputStream in = fis.get()) {
in.readVectored(ranges, getAllocate());
validateVectoredReadResult(ranges, datasetCorrect, 0);
}
final byte[] datasetCorrupted = ContractTestUtils.dataset(length, 'a', 64);
- try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){
+ try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)) {
out.write(datasetCorrupted);
}
CompletableFuture<FSDataInputStream> fisN =
localFs.openFile(testPath).build();
- try (FSDataInputStream in = fisN.get()){
+ try (FSDataInputStream in = fisN.get()) {
in.readVectored(ranges, getAllocate());
// Expect checksum exception when data is updated directly through
// raw local fs instance.
@@ -123,20 +131,68 @@ public void tesChecksumVectoredReadBoundaries() throws
Exception {
final int length = 1071;
LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32);
- try (FSDataOutputStream out = localFs.create(testPath, true)){
+ try (FSDataOutputStream out = localFs.create(testPath, true)) {
out.write(datasetCorrect);
}
Path checksumPath = localFs.getChecksumFile(testPath);
Assertions.assertThat(localFs.exists(checksumPath))
- .describedAs("Checksum file should be present at {} ",
checksumPath)
- .isTrue();
+ .describedAs("Checksum file should be present at {} ", checksumPath)
+ .isTrue();
CompletableFuture<FSDataInputStream> fis =
localFs.openFile(testPath).build();
List<FileRange> smallRange = new ArrayList<>();
smallRange.add(FileRange.createFileRange(1000, 71));
- try (FSDataInputStream in = fis.get()){
+ try (FSDataInputStream in = fis.get()) {
in.readVectored(smallRange, getAllocate());
validateVectoredReadResult(smallRange, datasetCorrect, 0);
}
}
+ /**
+ * subclass so that the bytes read count can be cached before the test run.
+ */
+ @Test
+ @Override
+ public void testVectoredReadMultipleRanges() throws Exception {
+ initialBytesRead = getBytesRead();
+ super.testVectoredReadMultipleRanges();
+ }
+
+ /**
+ * Validate statistics.
+ * Sometimes the tests failed with more than expected read, so the
assertions are on
+ * {@code isGreaterThanOrEqualTo()} rather than exact values.
+ */
+ @Override
+ protected void assertionsWithinTestVectoredReadMultipleRanges(
+ final FSDataInputStream in,
+ final List<FileRange> fileRanges) {
+
+ // check the iostats
+ final long totalVectorReadLength =
fileRanges.stream().mapToLong(FileRange::getLength).sum();
+ final IOStatistics stats = in.getIOStatistics();
+ assertThatStatisticCounter(stats, STREAM_READ_VECTORED_OPERATIONS)
+ .describedAs(STREAM_READ_VECTORED_OPERATIONS + " stream %s", stats)
+ .isEqualTo(1);
+ assertThatStatisticCounter(stats, STREAM_READ_BYTES)
+ .describedAs(STREAM_READ_BYTES + " in bytes read in stream %s", stats)
+ .isGreaterThanOrEqualTo(totalVectorReadLength);
+
+ // validate filesystem stats, went up by at least that amount.
+ // expect counting of other things, crc files in particular
+ long currentBytesRead = getBytesRead();
+ assertThat(currentBytesRead)
+ .describedAs("bytes read in stream %s", in)
+ .isGreaterThanOrEqualTo(initialBytesRead + totalVectorReadLength);
+ }
+
+ /**
+ * API is deprecated, but Spark uses it, and it's how the regression was
found.
+ * this is how the production code looks at our stats.
+ * @return counter of bytes read across all stores. Never reset.
+ */
+ private static long getBytesRead() {
+ AtomicLong bytes = new AtomicLong();
+ FileSystem.getAllStatistics().forEach(st ->
bytes.addAndGet(st.getBytesRead()));
+ return bytes.get();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]