mukund-thakur commented on code in PR #7418:
URL: https://github.com/apache/hadoop/pull/7418#discussion_r1966018354
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java:
##########
@@ -319,74 +321,125 @@ AsynchronousFileChannel getAsyncChannel() throws
IOException {
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws
IOException {
+ readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
+ }
+
+ @Override
+ public void readVectored(final List<? extends FileRange> ranges,
+ final IntFunction<ByteBuffer> allocate,
+ final Consumer<ByteBuffer> release) throws IOException {
// Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
- // Set up all of the futures, so that we can use them if things fail
- for(FileRange range: sortedRanges) {
+ // Set up all of the futures, so that the caller can await on
+ // their competion.
+ for (FileRange range: sortedRanges) {
validateRangeRequest(range);
range.setData(new CompletableFuture<>());
}
- try {
- AsynchronousFileChannel channel = getAsyncChannel();
- ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()];
- AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges,
buffers);
- for(int i = 0; i < sortedRanges.size(); ++i) {
- FileRange range = sortedRanges.get(i);
- buffers[i] = allocate.apply(range.getLength());
- channel.read(buffers[i], range.getOffset(), i, asyncHandler);
- }
- } catch (IOException ioe) {
- LOG.debug("Exception occurred during vectored read ", ioe);
Review Comment:
seems like this was unnecessary. Any failure will automatically be caught in
failed() method.
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java:
##########
@@ -136,4 +137,30 @@ default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws
IOException {
VectoredReadUtils.readVectored(this, ranges, allocate);
}
+
+ /**
+ * Variant of {@link #readVectored(List, IntFunction)} where a release()
function
+ * may be invoked if problems surface during reads -this method is called to
+ * try to return any allocated buffer which has not been read yet.
+ * Buffers which have successfully been read and returned to the caller do
not
+ * get released: this is for failures only.
+ * <p>
+ * The default implementation calls readVectored/2 so as to ensure that
+ * if an existing stream implementation does not implement this method
+ * all is good.
+ * <p>
+ * Implementations SHOULD override this method if they can release buffers as
+ * part of their error handling.
+ * @param ranges the byte ranges to read
+ * @param allocate the function to allocate ByteBuffer
Review Comment:
nit: remove "the"
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java:
##########
@@ -136,4 +137,30 @@ default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws
IOException {
VectoredReadUtils.readVectored(this, ranges, allocate);
}
+
+ /**
+ * Variant of {@link #readVectored(List, IntFunction)} where a release()
function
+ * may be invoked if problems surface during reads -this method is called to
+ * try to return any allocated buffer which has not been read yet.
+ * Buffers which have successfully been read and returned to the caller do
not
+ * get released: this is for failures only.
+ * <p>
+ * The default implementation calls readVectored/2 so as to ensure that
+ * if an existing stream implementation does not implement this method
+ * all is good.
+ * <p>
+ * Implementations SHOULD override this method if they can release buffers as
+ * part of their error handling.
+ * @param ranges the byte ranges to read
+ * @param allocate the function to allocate ByteBuffer
+ * @param release the function to release a ByteBuffer.
Review Comment:
nit: remove "the"
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectorIOBufferPool.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
+
+import org.apache.hadoop.io.ByteBufferPool;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A ByteBufferPool implementation that uses a pair of functions to allocate
+ * and release ByteBuffers; intended for use implementing the VectorIO API
+ * as it makes the pair of functions easier to pass around and use in
+ * existing code.
+ * <p>
+ * No matter what kind of buffer is requested, the allocation function
+ * is invoked; that is: the direct flag is ignored.
+ */
+public final class VectorIOBufferPool implements ByteBufferPool {
Review Comment:
why do we need this when we have WeakReferencedElasticByteBufferPool ? An
actual implementation of release function should call putBuffer() of the
WeakReferencedElasticByteBufferPool no?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java:
##########
@@ -52,6 +53,15 @@ public final class VectoredReadUtils {
private static final Logger LOG =
LoggerFactory.getLogger(VectoredReadUtils.class);
+ /**
+ * This releaser just logs at debug that the buffer
+ * was released.
+ */
+ public static final Consumer<ByteBuffer> LOG_BYTE_BUFFER_RELEASED =
+ (buffer) -> {
+ LOG.debug("release buffer {}", buffer.toString());
Review Comment:
should we parameterize and add. releasing buffer for range[x-y] ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]