This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 7be4f85c8d7 HADOOP-19303. VectorIO API: support pass-down of a release() operator (#7418) 7be4f85c8d7 is described below commit 7be4f85c8d7f55876b731fdecc9fc7e2668ab86f Author: Steve Loughran <ste...@cloudera.com> AuthorDate: Mon Mar 3 14:41:40 2025 +0000 HADOOP-19303. VectorIO API: support pass-down of a release() operator (#7418) The PositionedReadable vector IO API has a new readVectored() method which takes a release operator as its third argument. readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate, Consumer<ByteBuffer> release) This is return buffers to pools even in failures. The default implementation hands back to readVectored/2, so that existing custom implementations of that will get invoked. Contributed by Steve Loughran --- .../apache/hadoop/fs/BufferedFSInputStream.java | 8 ++ .../org/apache/hadoop/fs/ChecksumFileSystem.java | 12 +- .../org/apache/hadoop/fs/FSDataInputStream.java | 8 ++ .../org/apache/hadoop/fs/PositionedReadable.java | 29 +++++ .../org/apache/hadoop/fs/RawLocalFileSystem.java | 127 +++++++++++++++------ .../org/apache/hadoop/fs/VectoredReadUtils.java | 55 ++++++++- .../apache/hadoop/fs/impl/VectorIOBufferPool.java | 80 +++++++++++++ .../site/markdown/filesystem/fsdatainputstream.md | 25 ++++ .../contract/AbstractContractVectoredReadTest.java | 65 ++++++++--- .../hadoop/fs/impl/TestVectoredReadUtils.java | 48 ++++++++ .../org/apache/hadoop/fs/s3a/S3AInputStream.java | 24 +++- 11 files changed, 427 insertions(+), 54 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java index 7f3171235c8..4e5850bcf3d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java @@ -24,6 +24,7 @@ import java.util.StringJoiner; import java.nio.ByteBuffer; import java.util.List; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; @@ -181,4 +182,11 @@ public void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException { ((PositionedReadable) in).readVectored(ranges, allocate); } + + @Override + public void readVectored(final List<? extends FileRange> ranges, + final IntFunction<ByteBuffer> allocate, + final Consumer<ByteBuffer> release) throws IOException { + ((PositionedReadable) in).readVectored(ranges, allocate, release); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 4171c8f13e2..45b3f90feaa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.function.Consumer; import java.util.function.IntFunction; import java.util.zip.CRC32; @@ -438,6 +439,13 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes, @Override public void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException { + readVectored(ranges, allocate, (b) -> { }); + } + + @Override + public void readVectored(final List<? extends FileRange> ranges, + final IntFunction<ByteBuffer> allocate, + final Consumer<ByteBuffer> release) throws IOException { // If the stream doesn't have checksums, just delegate. if (sums == null) { @@ -462,8 +470,8 @@ public void readVectored(List<? extends FileRange> ranges, } List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges, bytesPerSum, minSeek, maxSize); - sums.readVectored(checksumRanges, allocate); - datas.readVectored(dataRanges, allocate); + sums.readVectored(checksumRanges, allocate, release); + datas.readVectored(dataRanges, allocate, release); for(CombinedFileRange checksumRange: checksumRanges) { for(FileRange dataRange: checksumRange.getUnderlying()) { // when we have both the ranges, validate the checksum diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index fc36b5bd6d6..2953a050135 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.List; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; @@ -306,4 +307,11 @@ public void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException { ((PositionedReadable) in).readVectored(ranges, allocate); } + + @Override + public void readVectored(final List<? extends FileRange> ranges, + final IntFunction<ByteBuffer> allocate, + final Consumer<ByteBuffer> release) throws IOException { + ((PositionedReadable) in).readVectored(ranges, allocate, release); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java index 8762bedb17a..40cd658c55f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -21,11 +21,13 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import static java.util.Objects.requireNonNull; import static org.apache.hadoop.io.Sizes.S_16K; import static org.apache.hadoop.io.Sizes.S_1M; @@ -136,4 +138,31 @@ default void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException { VectoredReadUtils.readVectored(this, ranges, allocate); } + + /** + * Extension of {@link #readVectored(List, IntFunction)} where a {@code release(buffer)} + * operation may be invoked if problems surface during reads. + * <p> + * The {@code release} operation is invoked after an IOException + * to return the actively buffer to a pool before reporting a failure + * in the future. + * <p> + * The default implementation calls {@link #readVectored(List, IntFunction)}.p + * <p> + * Implementations SHOULD override this method if they can release buffers as + * part of their error handling. + * @param ranges the byte ranges to read + * @param allocate function to allocate ByteBuffer + * @param release callable to release a ByteBuffer. + * @throws IOException any IOE. + * @throws IllegalArgumentException if any of ranges are invalid, or they overlap. + * @throws NullPointerException null arguments. + */ + default void readVectored(List<? extends FileRange> ranges, + IntFunction<ByteBuffer> allocate, + Consumer<ByteBuffer> release) throws IOException { + requireNonNull(release); + readVectored(ranges, allocate); + } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index fa5624e6715..d5f545b460d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -49,12 +49,14 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.StoreImplementationUtils; +import org.apache.hadoop.fs.impl.VectorIOBufferPool; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; @@ -62,12 +64,14 @@ import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList; import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; @@ -319,74 +323,131 @@ AsynchronousFileChannel getAsyncChannel() throws IOException { @Override public void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + @Override + public void readVectored(final List<? extends FileRange> ranges, + final IntFunction<ByteBuffer> allocate, + final Consumer<ByteBuffer> release) throws IOException { // Validate, but do not pass in a file length as it may change. List<? extends FileRange> sortedRanges = sortRangeList(ranges); - // Set up all of the futures, so that we can use them if things fail - for(FileRange range: sortedRanges) { + // Set up all of the futures, so that the caller can await on + // their completion. + for (FileRange range: sortedRanges) { validateRangeRequest(range); range.setData(new CompletableFuture<>()); } - try { - AsynchronousFileChannel channel = getAsyncChannel(); - ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()]; - AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers); - for(int i = 0; i < sortedRanges.size(); ++i) { - FileRange range = sortedRanges.get(i); - buffers[i] = allocate.apply(range.getLength()); - channel.read(buffers[i], range.getOffset(), i, asyncHandler); - } - } catch (IOException ioe) { - LOG.debug("Exception occurred during vectored read ", ioe); - for(FileRange range: sortedRanges) { - range.getData().completeExceptionally(ioe); - } - } + final ByteBufferPool pool = new VectorIOBufferPool(allocate, release); + // Initiate the asynchronous reads. + new AsyncHandler(getAsyncChannel(), + sortedRanges, + pool) + .initiateRead(); } } /** * A CompletionHandler that implements readFully and translates back * into the form of CompletionHandler that our users expect. + * <p> + * All reads are started in {@link #initiateRead()}; + * the handler then receives callbacks on success + * {@link #completed(Integer, Integer)}, and on failure + * by {@link #failed(Throwable, Integer)}. + * These are mapped to the specific range in the read, and its + * outcome updated. */ - static class AsyncHandler implements CompletionHandler<Integer, Integer> { + private static class AsyncHandler implements CompletionHandler<Integer, Integer> { + /** File channel to read from. */ private final AsynchronousFileChannel channel; + + /** Ranges to fetch. */ private final List<? extends FileRange> ranges; + + /** + * Pool providing allocate/release operations. + */ + private final ByteBufferPool allocateRelease; + + /** Buffers being read. */ private final ByteBuffer[] buffers; - AsyncHandler(AsynchronousFileChannel channel, - List<? extends FileRange> ranges, - ByteBuffer[] buffers) { + /** + * Instantiate. + * @param channel open channel. + * @param ranges ranges to read. + * @param allocateRelease pool for allocating buffers, and releasing on failure + */ + AsyncHandler( + final AsynchronousFileChannel channel, + final List<? extends FileRange> ranges, + final ByteBufferPool allocateRelease) { this.channel = channel; this.ranges = ranges; - this.buffers = buffers; + this.buffers = new ByteBuffer[ranges.size()]; + this.allocateRelease = allocateRelease; + } + + /** + * Initiate the read operation. + * <p> + * Allocate all buffers, queue the read into the channel, + * providing this object as the handler. + */ + private void initiateRead() { + for(int i = 0; i < ranges.size(); ++i) { + FileRange range = ranges.get(i); + buffers[i] = allocateRelease.getBuffer(false, range.getLength()); + channel.read(buffers[i], range.getOffset(), i, this); + } } + /** + * Callback for a completed full/partial read. + * <p> + * For an EOF the number of bytes may be -1. + * That is mapped to a {@link #failed(Throwable, Integer)} outcome. + * @param result The bytes read. + * @param rangeIndex range index within the range list. + */ @Override - public void completed(Integer result, Integer r) { - FileRange range = ranges.get(r); - ByteBuffer buffer = buffers[r]; + public void completed(Integer result, Integer rangeIndex) { + FileRange range = ranges.get(rangeIndex); + ByteBuffer buffer = buffers[rangeIndex]; if (result == -1) { - failed(new EOFException("Read past End of File"), r); + // no data was read back. + failed(new EOFException("Read past End of File"), rangeIndex); } else { if (buffer.remaining() > 0) { // issue a read for the rest of the buffer - // QQ: What if this fails? It has the same handler. - channel.read(buffer, range.getOffset() + buffer.position(), r, this); + channel.read(buffer, range.getOffset() + buffer.position(), rangeIndex, this); } else { - // QQ: Why is this required? I think because we don't want the - // user to read data beyond limit. + // Flip the buffer and declare success. buffer.flip(); range.getData().complete(buffer); } } } + /** + * The read of the range failed. + * <p> + * Release the buffer supplied for this range, then + * report to the future as {{completeExceptionally(exc)}} + * @param exc exception. + * @param rangeIndex range index within the range list. + */ @Override - public void failed(Throwable exc, Integer r) { - LOG.debug("Failed while reading range {} ", r, exc); - ranges.get(r).getData().completeExceptionally(exc); + public void failed(Throwable exc, Integer rangeIndex) { + LOG.debug("Failed while reading range {} ", rangeIndex, exc); + // release the buffer + allocateRelease.putBuffer(buffers[rangeIndex]); + // report the failure. + ranges.get(rangeIndex).getData().completeExceptionally(exc); } + } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index 2f99edc910c..6adcba39a3f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.slf4j.Logger; @@ -52,6 +53,15 @@ public final class VectoredReadUtils { private static final Logger LOG = LoggerFactory.getLogger(VectoredReadUtils.class); + /** + * This releaser just logs at debug that the buffer + * was released. + */ + public static final Consumer<ByteBuffer> LOG_BYTE_BUFFER_RELEASED = + (buffer) -> { + LOG.debug("Release buffer of length {}: {}", buffer.limit(), buffer); + }; + /** * Validate a single range. * @param range range to validate. @@ -98,8 +108,26 @@ public static void validateVectoredReadRanges(List<? extends FileRange> ranges) public static void readVectored(PositionedReadable stream, List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate) throws EOFException { + readVectored(stream, ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * Variant of {@link #readVectored(PositionedReadable, List, IntFunction)} + * where a release() function is invoked if problems surface during reads. + * @param stream the stream to read the data from + * @param ranges the byte ranges to read + * @param allocate the function to allocate ByteBuffer + * @param release the function to release a ByteBuffer. + * @throws IllegalArgumentException if the any of ranges are invalid, or they overlap. + * @throws EOFException the range offset is negative + */ + public static void readVectored(PositionedReadable stream, + List<? extends FileRange> ranges, + IntFunction<ByteBuffer> allocate, + Consumer<ByteBuffer> release) throws EOFException { + for (FileRange range: validateAndSortRanges(ranges, Optional.empty())) { - range.setData(readRangeFrom(stream, range, allocate)); + range.setData(readRangeFrom(stream, range, allocate, release)); } } @@ -118,11 +146,31 @@ public static CompletableFuture<ByteBuffer> readRangeFrom( PositionedReadable stream, FileRange range, IntFunction<ByteBuffer> allocate) throws EOFException { + return readRangeFrom(stream, range, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * Synchronously reads a range from the stream dealing with the combinations + * of ByteBuffers buffers and PositionedReadable streams. + * @param stream the stream to read from + * @param range the range to read + * @param allocate the function to allocate ByteBuffers + * @param release the function to release a ByteBuffer. + * @return the CompletableFuture that contains the read data or an exception. + * @throws IllegalArgumentException the range is invalid other than by offset or being null. + * @throws EOFException the range offset is negative + * @throws NullPointerException if the range is null. + */ + public static CompletableFuture<ByteBuffer> readRangeFrom( + PositionedReadable stream, + FileRange range, + IntFunction<ByteBuffer> allocate, + Consumer<ByteBuffer> release) throws EOFException { validateRangeRequest(range); CompletableFuture<ByteBuffer> result = new CompletableFuture<>(); + ByteBuffer buffer = allocate.apply(range.getLength()); try { - ByteBuffer buffer = allocate.apply(range.getLength()); if (stream instanceof ByteBufferPositionedReadable) { LOG.debug("ByteBufferPositionedReadable.readFully of {}", range); ((ByteBufferPositionedReadable) stream).readFully(range.getOffset(), @@ -136,6 +184,7 @@ public static CompletableFuture<ByteBuffer> readRangeFrom( result.complete(buffer); } catch (IOException ioe) { LOG.debug("Failed to read {}", range, ioe); + release.accept(buffer); result.completeExceptionally(ioe); } return result; @@ -147,6 +196,8 @@ public static CompletableFuture<ByteBuffer> readRangeFrom( * @param range file range * @param buffer destination buffer * @throws IOException IO problems. + * @throws EOFException the end of the data was reached before + * the read operation completed */ private static void readNonByteBufferPositionedReadable( PositionedReadable stream, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectorIOBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectorIOBufferPool.java new file mode 100644 index 00000000000..4548792979a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectorIOBufferPool.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.nio.ByteBuffer; +import java.util.function.Consumer; +import java.util.function.IntFunction; + +import org.apache.hadoop.io.ByteBufferPool; + +import static java.util.Objects.requireNonNull; + +/** + * A ByteBufferPool implementation that uses a pair of functions to allocate + * and release ByteBuffers; intended for use implementing the VectorIO API + * as it makes the pair of functions easier to pass around and use in + * existing code. + * <p> + * No matter what kind of buffer is requested, the allocation function + * is invoked; that is: the direct flag is ignored. + */ +public final class VectorIOBufferPool implements ByteBufferPool { + + /** The function to allocate a buffer. */ + private final IntFunction<ByteBuffer> allocate; + + /** The function to release a buffer. */ + private final Consumer<ByteBuffer> release; + + /** + * @param allocate function to allocate ByteBuffer + * @param release callable to release a ByteBuffer. + */ + public VectorIOBufferPool( + IntFunction<ByteBuffer> allocate, + Consumer<ByteBuffer> release) { + this.allocate = requireNonNull(allocate); + this.release = requireNonNull(release); + } + + /** + * Get a ByteBuffer. + * @param direct heap/direct flag. Unused. + * @param length The minimum length the buffer will have. + * @return a buffer + */ + @Override + public ByteBuffer getBuffer(final boolean direct, final int length) { + return allocate.apply(length); + } + + /** + * Release a buffer. + * Unlike normal ByteBufferPool implementations + * a null buffer is accepted and ignored. + * @param buffer buffer to release; may be null. + */ + @Override + public void putBuffer(final ByteBuffer buffer) { + if (buffer != null) { + release.accept(buffer); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index ef4a8ff11a8..e671cb6fa7c 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -654,3 +654,28 @@ Stream.hasCapability("in:readvectored") Given the HADOOP-18296 problem with `ChecksumFileSystem` and direct buffers, across all releases, it is best to avoid using this API in production with direct buffers. + +## `void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate, Consumer<ByteBuffer> release)` + +This is the extension of `readVectored/2` with an additional `release` consumer operation to release buffers. + +The specification and rules of this method are exactly those of the other operation, with +the addition of: + +Preconditions +``` +if release = null raise NullPointerException +``` + +* If a read operation fails due to an `IOException` or similar, the implementation of `readVectored()`, + SHOULD call `release(buffer)` with the buffer created by invoking the `allocate()` function into which + the data was being read. +* Implementations MUST NOT call `release(buffer)` with any non-null buffer _not_ obtained through `allocate()`. +* Implementations MUST only call `release(buffer)` when a failure has occurred and the future is about to have `Future.completedExceptionally()` invoked. + +It is an extension to the original Vector Read API -not all versions of Hadoop with the original `readVectored()` call define it. +If used directly in application code, that application is restricting itself to later versions +of the API. + +If used via reflection, if this method is not found, fall back to the original method. + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index dcdfba2add6..e32107be656 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -30,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntFunction; import org.assertj.core.api.Assertions; @@ -47,8 +47,8 @@ import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.util.concurrent.HadoopExecutors; -import org.apache.hadoop.util.functional.FutureIO; +import static java.util.Arrays.asList; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR; @@ -58,10 +58,16 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.range; import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; -import static org.apache.hadoop.test.LambdaTestUtils.intercept; + import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; +/** + * Test Vectored Reads. + * <p> + * Both the original readVectored(allocator) and the readVectored(allocator, release) + * operations are tested. + */ @RunWith(Parameterized.class) public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase { @@ -90,12 +96,19 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac */ private Path vectorPath; + /** + * Counter of buffer releases. + * Because not all implementations release buffers on failures, + * this is not yet used in assertions. + */ + private final AtomicInteger bufferReleases = new AtomicInteger(); + @Parameterized.Parameters(name = "Buffer type : {0}") public static List<String> params() { - return Arrays.asList("direct", "array"); + return asList("direct", "array"); } - public AbstractContractVectoredReadTest(String bufferType) { + protected AbstractContractVectoredReadTest(String bufferType) { this.bufferType = bufferType; final boolean isDirect = !"array".equals(bufferType); this.allocate = size -> pool.getBuffer(isDirect, size); @@ -109,6 +122,15 @@ protected IntFunction<ByteBuffer> getAllocate() { return allocate; } + /** + * The buffer release operation. + */ + protected void release(ByteBuffer buffer) { + LOG.info("Released buffer {}", buffer); + bufferReleases.incrementAndGet(); + pool.putBuffer(buffer); + } + /** * Get the vector IO buffer pool. * @return a pool. @@ -164,7 +186,7 @@ public void testVectoredReadMultipleRanges() throws Exception { fileRanges.add(fileRange); } try (FSDataInputStream in = openVectorFile()) { - in.readVectored(fileRanges, allocate); + in.readVectored(fileRanges, allocate, this::release); CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()]; int i = 0; for (FileRange res : fileRanges) { @@ -186,7 +208,7 @@ public void testVectoredReadAndReadFully() throws Exception { in.readVectored(fileRanges, allocate); byte[] readFullRes = new byte[100]; in.readFully(100, readFullRes); - ByteBuffer vecRes = FutureIO.awaitFuture(fileRanges.get(0).getData()); + ByteBuffer vecRes = awaitFuture(fileRanges.get(0).getData()); Assertions.assertThat(vecRes) .describedAs("Result from vectored read and readFully must match") .isEqualByComparingTo(ByteBuffer.wrap(readFullRes)); @@ -201,7 +223,7 @@ public void testVectoredReadWholeFile() throws Exception { range(fileRanges, 0, DATASET_LEN); try (FSDataInputStream in = openVectorFile()) { in.readVectored(fileRanges, allocate); - ByteBuffer vecRes = FutureIO.awaitFuture(fileRanges.get(0).getData()); + ByteBuffer vecRes = awaitFuture(fileRanges.get(0).getData()); Assertions.assertThat(vecRes) .describedAs("Result from vectored read and readFully must match") .isEqualByComparingTo(ByteBuffer.wrap(DATASET)); @@ -220,7 +242,7 @@ public void testDisjointRanges() throws Exception { range(fileRanges, 4_000 + 101, 100); range(fileRanges, 16_000 + 101, 100); try (FSDataInputStream in = openVectorFile()) { - in.readVectored(fileRanges, allocate); + in.readVectored(fileRanges, allocate, this::release); validateVectoredReadResult(fileRanges, DATASET, 0); returnBuffersToPoolPostRead(fileRanges, pool); } @@ -263,7 +285,7 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception { .withFileStatus(fileStatus) .build(); try (FSDataInputStream in = builder.get()) { - in.readVectored(fileRanges, allocate); + in.readVectored(fileRanges, allocate, this::release); validateVectoredReadResult(fileRanges, DATASET, 0); returnBuffersToPoolPostRead(fileRanges, pool); } @@ -301,7 +323,7 @@ public void testSameRanges() throws Exception { } else { try (FSDataInputStream in = openVectorFile()) { List<FileRange> fileRanges = getSampleSameRanges(); - in.readVectored(fileRanges, allocate); + in.readVectored(fileRanges, allocate, this::release); validateVectoredReadResult(fileRanges, DATASET, 0); returnBuffersToPoolPostRead(fileRanges, pool); } @@ -352,7 +374,7 @@ public void testConsecutiveRanges() throws Exception { range(fileRanges, offset, length); range(fileRanges, offset + length, length); try (FSDataInputStream in = openVectorFile()) { - in.readVectored(fileRanges, allocate); + in.readVectored(fileRanges, allocate, this::release); validateVectoredReadResult(fileRanges, DATASET, 0); returnBuffersToPoolPostRead(fileRanges, pool); } @@ -408,12 +430,12 @@ public void testVectoredReadWholeFilePlusOne() throws Exception { private void expectEOFinRead(final List<FileRange> fileRanges) throws Exception { LOG.info("Expecting late EOF failure"); try (FSDataInputStream in = openVectorFile()) { - in.readVectored(fileRanges, allocate); + in.readVectored(fileRanges, allocate, this::release); for (FileRange res : fileRanges) { CompletableFuture<ByteBuffer> data = res.getData(); interceptFuture(EOFException.class, "", - ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS, data); } @@ -431,6 +453,17 @@ public void testNegativeOffsetRange() throws Exception { verifyExceptionalVectoredRead(range(-1, 50), EOFException.class); } + @Test + public void testNullReleaseOperation() throws Exception { + + final List<FileRange> range = range(0, 10); + + try (FSDataInputStream in = openVectorFile()) { + intercept(NullPointerException.class, () -> + in.readVectored(range, allocate, null)); + } + } + @Test public void testNormalReadAfterVectoredRead() throws Exception { List<FileRange> fileRanges = createSampleNonOverlappingRanges(); @@ -469,7 +502,7 @@ public void testMultipleVectoredReads() throws Exception { List<FileRange> fileRanges2 = createSampleNonOverlappingRanges(); try (FSDataInputStream in = openVectorFile()) { in.readVectored(fileRanges1, allocate); - in.readVectored(fileRanges2, allocate); + in.readVectored(fileRanges2, allocate, this::release); validateVectoredReadResult(fileRanges2, DATASET, 0); validateVectoredReadResult(fileRanges1, DATASET, 0); returnBuffersToPoolPostRead(fileRanges1, pool); @@ -524,7 +557,7 @@ private void readBufferValidateDataAndReturnToPool(FileRange res, CompletableFuture<ByteBuffer> data = res.getData(); // Read the data and perform custom operation. Here we are just // validating it with original data. - FutureIO.awaitFuture(data.thenAccept(buffer -> { + awaitFuture(data.thenAccept(buffer -> { assertDatasetEquals((int) res.getOffset(), "vecRead", buffer, res.getLength(), DATASET); // return buffer to the pool once read. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java index b08fc95279a..a3913732d1e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.assertj.core.api.Assertions; @@ -40,6 +41,8 @@ import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.VectoredReadUtils; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.test.HadoopTestBase; import static java.util.Arrays.asList; @@ -823,4 +826,49 @@ public void testReadOverEOFRejected() throws Exception { asList(createFileRange(length - 1, 2)), Optional.of(length))); } + + @Test + public void testVectorIOBufferPool() throws Throwable { + ElasticByteBufferPool elasticByteBufferPool = new ElasticByteBufferPool(); + + // inlined lambda to assert the pool size + Consumer<Integer> assertPoolSizeEquals = (size) -> { + Assertions.assertThat(elasticByteBufferPool.size(false)) + .describedAs("Pool size") + .isEqualTo(size); + }; + + // build vector pool from the buffer pool operations converted to + // allocate and release lambda expressions + ByteBufferPool vectorBuffers = new VectorIOBufferPool( + r -> elasticByteBufferPool.getBuffer(false, r), + elasticByteBufferPool::putBuffer); + + assertPoolSizeEquals.accept(0); + + final ByteBuffer b1 = vectorBuffers.getBuffer(false, 100); + final ByteBuffer b2 = vectorBuffers.getBuffer(false, 50); + + // return the first buffer for a pool size of 1 + vectorBuffers.putBuffer(b1); + assertPoolSizeEquals.accept(1); + + // expect the returned buffer back + ByteBuffer b3 = vectorBuffers.getBuffer(true, 100); + Assertions.assertThat(b3) + .describedAs("buffer returned from a get after a previous one was returned") + .isSameAs(b1); + assertPoolSizeEquals.accept(0); + + // return them all + vectorBuffers.putBuffer(b2); + vectorBuffers.putBuffer(b3); + assertPoolSizeEquals.accept(2); + + // release does not propagate + vectorBuffers.release(); + assertPoolSizeEquals.accept(2); + + elasticByteBufferPool.release(); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 4d7de5733c0..f5ea8467dce 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.IntFunction; import software.amazon.awssdk.core.ResponseInputStream; @@ -61,6 +62,8 @@ import org.apache.hadoop.io.IOUtils; +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint; import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; @@ -825,7 +828,8 @@ public void readFully(long position, byte[] buffer, int offset, int length) /** * {@inheritDoc} - * Vectored read implementation for S3AInputStream. + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. * @param ranges the byte ranges to read. * @param allocate the function to allocate ByteBuffer. * @throws IOException IOE if any. @@ -833,11 +837,29 @@ public void readFully(long position, byte[] buffer, int offset, int length) @Override public synchronized void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * {@inheritDoc} + * Vectored read implementation for S3AInputStream. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @param release the function to release a ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(final List<? extends FileRange> ranges, + final IntFunction<ByteBuffer> allocate, + final Consumer<ByteBuffer> release) throws IOException { LOG.debug("Starting vectored read on path {} for ranges {} ", getPathStr(), ranges); checkNotClosed(); if (stopVectoredIOOperations.getAndSet(false)) { LOG.debug("Reinstating vectored read operation for path {} ", getPathStr()); } + requireNonNull(allocate, "ranges"); + requireNonNull(allocate, "allocate"); + requireNonNull(release, "release"); // prepare to read List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org