adoroszlai commented on code in PR #9402:
URL: https://github.com/apache/ozone/pull/9402#discussion_r2587898276


##########
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java:
##########
@@ -121,4 +125,24 @@ public boolean seekToNewSource(long targetPos) throws 
IOException {
           "underlying inputStream");
     }
   }
+
+  /**
+   * Read data from multiple byte ranges asynchronously.
+   * This allows reading multiple discontiguous ranges from the same file
+   * efficiently with a single API call.
+   *
+   * @param ranges list of file ranges to read
+   * @param allocate function to allocate ByteBuffer for each range
+   * @throws IOException if there is an error performing the reads
+   */
+  public void readVectored(List<? extends FileRange> ranges,
+                           IntFunction<ByteBuffer> allocate) throws 
IOException {

Review Comment:
   nit:
   
   ```suggestion
     public void readVectored(
         List<? extends FileRange> ranges,
         IntFunction<ByteBuffer> allocate
     ) throws IOException {
   ```



##########
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##########
@@ -199,4 +203,56 @@ public void readFully(long position, ByteBuffer buf) 
throws IOException {
       }
     }
   }
+
+  /**
+   * Implements vectored read by reading each range asynchronously.
+   * This allows clients to read multiple byte ranges from the same file
+   * in a single call, potentially improving performance by enabling
+   * parallel reads and reducing round-trip overhead.
+   *
+   * @param ranges list of file ranges to read
+   * @param allocate function to allocate ByteBuffer for each range
+   * @throws IOException if there is an error performing the reads
+   */
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+                           IntFunction<ByteBuffer> allocate) throws 
IOException {

Review Comment:
   nit:
   
   ```suggestion
     public void readVectored(
         List<? extends FileRange> ranges,
         IntFunction<ByteBuffer> allocate
     ) throws IOException {
   ```



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java:
##########
@@ -232,6 +236,75 @@ public synchronized long skip(long n) throws IOException {
     return toSkip;
   }
 
+  /**
+   * Implements vectored read for multipart input stream.
+   * This method reads multiple byte ranges asynchronously, potentially
+   * from different underlying part streams.
+   *
+   * @param ranges list of file ranges to read
+   * @param allocate function to allocate ByteBuffer for each range
+   * @throws IOException if there is an error performing the reads
+   */
+  public void readVectored(List<? extends FileRange> ranges,
+                           IntFunction<ByteBuffer> allocate) throws 
IOException {

Review Comment:
   nit: Please do not format method signature like this. Whenever visibility / 
return type / method name / other modifiers are changed, we would have to 
reindent all parameters.
   
   ```suggestion
     public void readVectored(
         List<? extends FileRange> ranges,
         IntFunction<ByteBuffer> allocate
     ) throws IOException {
   ```



##########
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##########
@@ -199,4 +203,56 @@ public void readFully(long position, ByteBuffer buf) 
throws IOException {
       }
     }
   }
+
+  /**
+   * Implements vectored read by reading each range asynchronously.
+   * This allows clients to read multiple byte ranges from the same file
+   * in a single call, potentially improving performance by enabling
+   * parallel reads and reducing round-trip overhead.
+   *
+   * @param ranges list of file ranges to read
+   * @param allocate function to allocate ByteBuffer for each range
+   * @throws IOException if there is an error performing the reads
+   */
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+                           IntFunction<ByteBuffer> allocate) throws 
IOException {
+    TracingUtil.executeInNewSpan("OzoneFSInputStream.readVectored", () -> {
+      // Perform vectored read using positioned read operations
+      for (FileRange range : ranges) {
+        CompletableFuture<ByteBuffer> result = range.getData();
+        if (result == null) {
+          result = new CompletableFuture<>();
+          range.setData(result);
+        }
+
+        final CompletableFuture<ByteBuffer> finalResult = result;
+        final long offset = range.getOffset();
+        final int length = range.getLength();
+
+        // Submit async read task for this range
+        CompletableFuture.runAsync(() -> {
+          try {
+            ByteBuffer buffer = allocate.apply(length);
+            int bytesRead = read(offset, buffer);
+
+            if (bytesRead < length) {
+              finalResult.completeExceptionally(
+                  new EOFException("Requested " + length +
+                      " bytes but only read " + bytesRead));

Review Comment:
   `read()` is not guaranteed to read all data to fill the buffer.  For that, 
use `readFully()`.



##########
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java:
##########
@@ -199,4 +203,56 @@ public void readFully(long position, ByteBuffer buf) 
throws IOException {
       }
     }
   }
+
+  /**
+   * Implements vectored read by reading each range asynchronously.
+   * This allows clients to read multiple byte ranges from the same file
+   * in a single call, potentially improving performance by enabling
+   * parallel reads and reducing round-trip overhead.
+   *
+   * @param ranges list of file ranges to read
+   * @param allocate function to allocate ByteBuffer for each range
+   * @throws IOException if there is an error performing the reads
+   */
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+                           IntFunction<ByteBuffer> allocate) throws 
IOException {
+    TracingUtil.executeInNewSpan("OzoneFSInputStream.readVectored", () -> {
+      // Perform vectored read using positioned read operations
+      for (FileRange range : ranges) {
+        CompletableFuture<ByteBuffer> result = range.getData();
+        if (result == null) {
+          result = new CompletableFuture<>();
+          range.setData(result);
+        }
+
+        final CompletableFuture<ByteBuffer> finalResult = result;
+        final long offset = range.getOffset();
+        final int length = range.getLength();
+
+        // Submit async read task for this range
+        CompletableFuture.runAsync(() -> {
+          try {
+            ByteBuffer buffer = allocate.apply(length);
+            int bytesRead = read(offset, buffer);
+
+            if (bytesRead < length) {
+              finalResult.completeExceptionally(
+                  new EOFException("Requested " + length +
+                      " bytes but only read " + bytesRead));
+            } else {
+              buffer.flip();
+              if (statistics != null) {
+                statistics.incrementBytesRead(bytesRead);
+              }
+              finalResult.complete(buffer);
+            }
+          } catch (Exception e) {
+            finalResult.completeExceptionally(e);
+          }
+        });
+      }

Review Comment:
   This almost duplicates the implementation in `MultipartInputStream`.  Can 
they share code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to