steveloughran commented on code in PR #4445:
URL: https://github.com/apache/hadoop/pull/4445#discussion_r901842666


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -272,43 +341,64 @@ public void testVectoredReadAfterNormalRead() throws 
Exception {
               .isEqualTo(200);
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
   @Test
   public void testMultipleVectoredReads() throws Exception {
     FileSystem fs = getFileSystem();
-    List<FileRange> fileRanges1 = createSomeOverlappingRanges();
-    List<FileRange> fileRanges2 = createSomeOverlappingRanges();
+    List<FileRange> fileRanges1 = createSampleNonOverlappingRanges();
+    List<FileRange> fileRanges2 = createSampleNonOverlappingRanges();
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges1, allocate);
       in.readVectored(fileRanges2, allocate);
       validateVectoredReadResult(fileRanges2, DATASET);
       validateVectoredReadResult(fileRanges1, DATASET);
+      returnBuffersToPoolPostRead(fileRanges1, pool);
+      returnBuffersToPoolPostRead(fileRanges2, pool);
     }
   }
 
-  protected List<FileRange> createSomeOverlappingRanges() {
+  protected List<FileRange> createSampleNonOverlappingRanges() {
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(0, 100));
-    fileRanges.add(new FileRangeImpl(90, 50));
+    fileRanges.add(FileRange.createFileRange(0, 100));
+    fileRanges.add(FileRange.createFileRange(110, 50));
     return fileRanges;
   }
 
+  protected List<FileRange> getSampleSameRanges() {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(8_000, 1000));
+    fileRanges.add(FileRange.createFileRange(8_000, 1000));
+    fileRanges.add(FileRange.createFileRange(8_000, 1000));
+    return fileRanges;
+  }
 
-  protected void testExceptionalVectoredRead(FileSystem fs,
+  protected List<FileRange> getSampleOverlappingRanges() {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(100, 500));
+    fileRanges.add(FileRange.createFileRange(400, 500));
+    return fileRanges;
+  }
+
+  /**
+   * Validate that exceptions must be thrown during a vectored
+   * read operation with specific input ranges.
+   * @param fs FileSystem instance.
+   * @param fileRanges input file ranges.
+   * @param clazz type of exception expected.
+   * @throws Exception any other IOE.
+   */
+  protected <T extends Throwable> void testExceptionalVectoredRead(FileSystem 
fs,

Review Comment:
   can you call this something other than test-, e.g 
verifyExceptionalVectoredRead()



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,135 @@ public void readVectored(List<? extends FileRange> 
ranges,
 
     LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, 
ranges);
     checkNotClosed();
+    if (stopVectoredIOOperations.getAndSet(false)) {
+      LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+    }
+    List<? extends FileRange> sortedRanges = 
validateNonOverlappingAndReturnSortedRanges(ranges);
     for (FileRange range : ranges) {
       validateRangeRequest(range);
       CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
       range.setData(result);
     }
 
-    if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+    if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
       LOG.debug("Not merging the ranges as they are disjoint");
-      for(FileRange range: ranges) {
+      for (FileRange range: sortedRanges) {
         ByteBuffer buffer = allocate.apply(range.getLength());
         unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
       }
     } else {
       LOG.debug("Trying to merge the ranges as they are not disjoint");
-      List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+      List<CombinedFileRange> combinedFileRanges = 
mergeSortedRanges(sortedRanges,
               1, minSeekForVectorReads(),
               maxReadSizeForVectorReads());
       LOG.debug("Number of original ranges size {} , Number of combined ranges 
{} ",
               ranges.size(), combinedFileRanges.size());
-      for(CombinedFileRange combinedFileRange: combinedFileRanges) {
-        CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
-        ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
-        combinedFileRange.setData(result);
+      for (CombinedFileRange combinedFileRange: combinedFileRanges) {
         unboundedThreadPool.submit(
-            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
buffer));
+            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
allocate));
       }
     }
     LOG.debug("Finished submitting vectored read to threadpool" +
             " on path {} for ranges {} ", pathStr, ranges);
   }
 
   /**
-   * Read data in the combinedFileRange and update data in buffers
-   * of all underlying ranges.
-   * @param combinedFileRange combined range.
-   * @param buffer combined buffer.
+   * Read the data from S3 for the bigger combined file range and update all 
the
+   * underlying ranges.
+   * @param combinedFileRange big combined file range.
+   * @param allocate method to create byte buffers to hold result data.
    */
   private void readCombinedRangeAndUpdateChildren(CombinedFileRange 
combinedFileRange,
-                                                  ByteBuffer buffer) {
-    // Not putting read single range call inside try block as
-    // exception if any occurred during this call will be raised
-    // during awaitFuture call while getting the combined buffer.
-    readSingleRange(combinedFileRange, buffer);
+                                                  IntFunction<ByteBuffer> 
allocate) {
+    LOG.debug("Start reading combined range {} from path {} ", 
combinedFileRange, pathStr);
+    // This reference is must be kept till all buffers are populated as this 
is a
+    // finalizable object which closes the internal stream when gc triggers.
+    S3Object objectRange = null;
+    S3ObjectInputStream objectContent = null;
     try {
-      // In case of single range we return the original byte buffer else
-      // we return slice byte buffers for each child ranges.
-      ByteBuffer combinedBuffer = 
FutureIOSupport.awaitFuture(combinedFileRange.getData());
-      if (combinedFileRange.getUnderlying().size() == 1) {
-        
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
-      } else {
-        for (FileRange child : combinedFileRange.getUnderlying()) {
-          updateOriginalRange(child, combinedBuffer, combinedFileRange);
-        }
+      checkIfVectoredIOStopped();
+      final String operationName = "readCombinedFileRange";
+      objectRange = getS3Object(operationName,
+              combinedFileRange.getOffset(),
+              combinedFileRange.getLength());
+      objectContent = objectRange.getObjectContent();
+      if (objectContent == null) {
+        throw new PathIOException(uri,
+                "Null IO stream received during " + operationName);
       }
+      populateChildBuffers(combinedFileRange, objectContent, allocate);
     } catch (Exception ex) {
-      LOG.warn("Exception occurred while reading combined range from file {}", 
pathStr, ex);
+      LOG.warn("Exception while reading a range {} from path {} ", 
combinedFileRange, pathStr, ex);

Review Comment:
   will we log noisily on an EOFException? as that probably doesn't need a 
stack trace & can just be logged at DEBUG



##########
hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java:
##########
@@ -47,7 +47,7 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.impl.FileRangeImpl;

Review Comment:
   shouldn't be needed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to