chungen0126 commented on code in PR #9402:
URL: https://github.com/apache/ozone/pull/9402#discussion_r2694657039
##########
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##########
@@ -207,4 +211,44 @@ public void readFully(long position, ByteBuffer buf)
throws IOException {
}
}
}
+
+ /**
+ * Implements vectored read by reading each range asynchronously.
+ * This allows clients to read multiple byte ranges from the same file
+ * in a single call, potentially improving performance by enabling
+ * parallel reads and reducing round-trip overhead.
+ *
+ * @param ranges list of file ranges to read
+ * @param allocate function to allocate ByteBuffer for each range
+ * @throws IOException if there is an error performing the reads
+ * @apiNote This method is synchronized to prevent race conditions from
+ * concurrent readVectored() calls on the same stream instance.
+ */
+ @Override
+ public synchronized void readVectored(List<? extends FileRange> ranges,
+ IntFunction<ByteBuffer> allocate) throws
IOException {
+ TracingUtil.executeInNewSpan("OzoneFSInputStream.readVectored", () -> {
+ // Save the initial position
+ final long initialPosition = getPos();
+
+ // Use common vectored read implementation
+ VectoredReadUtils.performVectoredRead(
+ ranges,
+ allocate,
+ (offset, buffer) -> {
+ // readFully is synchronized and uses positioned reads
+ // which automatically preserve stream position
+ readFully(offset, buffer);
+ if (statistics != null) {
+ statistics.incrementBytesRead(buffer.remaining());
+ }
+ }
+ );
+
+ // Restore position before returning from method
+ seek(initialPosition);
Review Comment:
Same comment as above.
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java:
##########
@@ -261,6 +264,95 @@ public synchronized long skip(long n) throws IOException {
return toSkip;
}
+ /**
+ * Implements vectored read for multipart input stream.
+ * This method reads multiple byte ranges asynchronously, potentially
+ * from different underlying part streams.
+ *
+ * @param ranges list of file ranges to read
+ * @param allocate function to allocate ByteBuffer for each range
+ * @throws IOException if there is an error performing the reads
+ * @apiNote This method is synchronized to prevent race conditions from
+ * concurrent readVectored() calls on the same stream instance.
+ */
+ public synchronized void readVectored(
+ List<? extends FileRange> ranges,
+ IntFunction<ByteBuffer> allocate
+ ) throws IOException {
+ checkOpen();
+ if (!initialized) {
+ initialize();
+ }
+
+ // Save the initial position
+ final long initialPosition = getPos();
+
+ // Use common vectored read implementation
+ VectoredReadUtils.performVectoredRead(
+ ranges,
+ allocate,
+ (offset, buffer) -> readRangeData(offset, buffer, initialPosition)
+ );
+
+ // Restore position
+ seek(initialPosition);
Review Comment:
I'm wondering if the seek to initialPosition is necessary here? Since the
offset changes asynchronously, restoring it at this point might not work
correctly. It seems like readRangeData already handles the position restoration
correctly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]