[ 
https://issues.apache.org/jira/browse/HADOOP-18106?focusedWorklogId=783080&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783080
 ]

ASF GitHub Bot logged work on HADOOP-18106:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Jun/22 16:47
            Start Date: 20/Jun/22 16:47
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on code in PR #4445:
URL: https://github.com/apache/hadoop/pull/4445#discussion_r901842666


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -272,43 +341,64 @@ public void testVectoredReadAfterNormalRead() throws 
Exception {
               .isEqualTo(200);
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
   @Test
   public void testMultipleVectoredReads() throws Exception {
     FileSystem fs = getFileSystem();
-    List<FileRange> fileRanges1 = createSomeOverlappingRanges();
-    List<FileRange> fileRanges2 = createSomeOverlappingRanges();
+    List<FileRange> fileRanges1 = createSampleNonOverlappingRanges();
+    List<FileRange> fileRanges2 = createSampleNonOverlappingRanges();
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges1, allocate);
       in.readVectored(fileRanges2, allocate);
       validateVectoredReadResult(fileRanges2, DATASET);
       validateVectoredReadResult(fileRanges1, DATASET);
+      returnBuffersToPoolPostRead(fileRanges1, pool);
+      returnBuffersToPoolPostRead(fileRanges2, pool);
     }
   }
 
-  protected List<FileRange> createSomeOverlappingRanges() {
+  protected List<FileRange> createSampleNonOverlappingRanges() {
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(0, 100));
-    fileRanges.add(new FileRangeImpl(90, 50));
+    fileRanges.add(FileRange.createFileRange(0, 100));
+    fileRanges.add(FileRange.createFileRange(110, 50));
     return fileRanges;
   }
 
+  protected List<FileRange> getSampleSameRanges() {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(8_000, 1000));
+    fileRanges.add(FileRange.createFileRange(8_000, 1000));
+    fileRanges.add(FileRange.createFileRange(8_000, 1000));
+    return fileRanges;
+  }
 
-  protected void testExceptionalVectoredRead(FileSystem fs,
+  protected List<FileRange> getSampleOverlappingRanges() {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(100, 500));
+    fileRanges.add(FileRange.createFileRange(400, 500));
+    return fileRanges;
+  }
+
+  /**
+   * Validate that exceptions must be thrown during a vectored
+   * read operation with specific input ranges.
+   * @param fs FileSystem instance.
+   * @param fileRanges input file ranges.
+   * @param clazz type of exception expected.
+   * @throws Exception any other IOE.
+   */
+  protected <T extends Throwable> void testExceptionalVectoredRead(FileSystem 
fs,

Review Comment:
   can you call this something other than test-, e.g 
verifyExceptionalVectoredRead()



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,135 @@ public void readVectored(List<? extends FileRange> 
ranges,
 
     LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, 
ranges);
     checkNotClosed();
+    if (stopVectoredIOOperations.getAndSet(false)) {
+      LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+    }
+    List<? extends FileRange> sortedRanges = 
validateNonOverlappingAndReturnSortedRanges(ranges);
     for (FileRange range : ranges) {
       validateRangeRequest(range);
       CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
       range.setData(result);
     }
 
-    if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+    if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
       LOG.debug("Not merging the ranges as they are disjoint");
-      for(FileRange range: ranges) {
+      for (FileRange range: sortedRanges) {
         ByteBuffer buffer = allocate.apply(range.getLength());
         unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
       }
     } else {
       LOG.debug("Trying to merge the ranges as they are not disjoint");
-      List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+      List<CombinedFileRange> combinedFileRanges = 
mergeSortedRanges(sortedRanges,
               1, minSeekForVectorReads(),
               maxReadSizeForVectorReads());
       LOG.debug("Number of original ranges size {} , Number of combined ranges 
{} ",
               ranges.size(), combinedFileRanges.size());
-      for(CombinedFileRange combinedFileRange: combinedFileRanges) {
-        CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
-        ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
-        combinedFileRange.setData(result);
+      for (CombinedFileRange combinedFileRange: combinedFileRanges) {
         unboundedThreadPool.submit(
-            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
buffer));
+            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
allocate));
       }
     }
     LOG.debug("Finished submitting vectored read to threadpool" +
             " on path {} for ranges {} ", pathStr, ranges);
   }
 
   /**
-   * Read data in the combinedFileRange and update data in buffers
-   * of all underlying ranges.
-   * @param combinedFileRange combined range.
-   * @param buffer combined buffer.
+   * Read the data from S3 for the bigger combined file range and update all 
the
+   * underlying ranges.
+   * @param combinedFileRange big combined file range.
+   * @param allocate method to create byte buffers to hold result data.
    */
   private void readCombinedRangeAndUpdateChildren(CombinedFileRange 
combinedFileRange,
-                                                  ByteBuffer buffer) {
-    // Not putting read single range call inside try block as
-    // exception if any occurred during this call will be raised
-    // during awaitFuture call while getting the combined buffer.
-    readSingleRange(combinedFileRange, buffer);
+                                                  IntFunction<ByteBuffer> 
allocate) {
+    LOG.debug("Start reading combined range {} from path {} ", 
combinedFileRange, pathStr);
+    // This reference is must be kept till all buffers are populated as this 
is a
+    // finalizable object which closes the internal stream when gc triggers.
+    S3Object objectRange = null;
+    S3ObjectInputStream objectContent = null;
     try {
-      // In case of single range we return the original byte buffer else
-      // we return slice byte buffers for each child ranges.
-      ByteBuffer combinedBuffer = 
FutureIOSupport.awaitFuture(combinedFileRange.getData());
-      if (combinedFileRange.getUnderlying().size() == 1) {
-        
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
-      } else {
-        for (FileRange child : combinedFileRange.getUnderlying()) {
-          updateOriginalRange(child, combinedBuffer, combinedFileRange);
-        }
+      checkIfVectoredIOStopped();
+      final String operationName = "readCombinedFileRange";
+      objectRange = getS3Object(operationName,
+              combinedFileRange.getOffset(),
+              combinedFileRange.getLength());
+      objectContent = objectRange.getObjectContent();
+      if (objectContent == null) {
+        throw new PathIOException(uri,
+                "Null IO stream received during " + operationName);
       }
+      populateChildBuffers(combinedFileRange, objectContent, allocate);
     } catch (Exception ex) {
-      LOG.warn("Exception occurred while reading combined range from file {}", 
pathStr, ex);
+      LOG.warn("Exception while reading a range {} from path {} ", 
combinedFileRange, pathStr, ex);

Review Comment:
   will we log noisily on an EOFException? as that probably doesn't need a 
stack trace & can just be logged at DEBUG



##########
hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java:
##########
@@ -47,7 +47,7 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.impl.FileRangeImpl;

Review Comment:
   shouldn't be needed now





Issue Time Tracking
-------------------

    Worklog Id:     (was: 783080)
    Time Spent: 2h 50m  (was: 2h 40m)

> Handle memory fragmentation in S3 Vectored IO implementation.
> -------------------------------------------------------------
>
>                 Key: HADOOP-18106
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18106
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>            Reporter: Mukund Thakur
>            Assignee: Mukund Thakur
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> As we have implemented merging of ranges in the S3AInputStream implementation 
> of vectored IO api, it can lead to memory fragmentation. Let me explain by 
> example.
>  
> Suppose client requests for 3 ranges. 
> 0-500, 700-1000 and 1200-1500.
> Now because of merging, all the above ranges will get merged into one and we 
> will allocate a big byte buffer of 0-1500 size but return sliced byte buffers 
> for the desired ranges.
> Now once the client is done reading all the ranges, it will only be able to 
> free the memory for requested ranges and memory of the gaps will never be 
> released for eg here (500-700 and 1000-1200).
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to