[ 
https://issues.apache.org/jira/browse/HADOOP-18106?focusedWorklogId=783103&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783103
 ]

ASF GitHub Bot logged work on HADOOP-18106:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Jun/22 17:46
            Start Date: 20/Jun/22 17:46
    Worklog Time Spent: 10m 
      Work Description: mukund-thakur commented on code in PR #4445:
URL: https://github.com/apache/hadoop/pull/4445#discussion_r901891617


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,135 @@ public void readVectored(List<? extends FileRange> 
ranges,
 
     LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, 
ranges);
     checkNotClosed();
+    if (stopVectoredIOOperations.getAndSet(false)) {
+      LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+    }
+    List<? extends FileRange> sortedRanges = 
validateNonOverlappingAndReturnSortedRanges(ranges);
     for (FileRange range : ranges) {
       validateRangeRequest(range);
       CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
       range.setData(result);
     }
 
-    if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+    if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
       LOG.debug("Not merging the ranges as they are disjoint");
-      for(FileRange range: ranges) {
+      for (FileRange range: sortedRanges) {
         ByteBuffer buffer = allocate.apply(range.getLength());
         unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
       }
     } else {
       LOG.debug("Trying to merge the ranges as they are not disjoint");
-      List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+      List<CombinedFileRange> combinedFileRanges = 
mergeSortedRanges(sortedRanges,
               1, minSeekForVectorReads(),
               maxReadSizeForVectorReads());
       LOG.debug("Number of original ranges size {} , Number of combined ranges 
{} ",
               ranges.size(), combinedFileRanges.size());
-      for(CombinedFileRange combinedFileRange: combinedFileRanges) {
-        CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
-        ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
-        combinedFileRange.setData(result);
+      for (CombinedFileRange combinedFileRange: combinedFileRanges) {
         unboundedThreadPool.submit(
-            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
buffer));
+            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
allocate));
       }
     }
     LOG.debug("Finished submitting vectored read to threadpool" +
             " on path {} for ranges {} ", pathStr, ranges);
   }
 
   /**
-   * Read data in the combinedFileRange and update data in buffers
-   * of all underlying ranges.
-   * @param combinedFileRange combined range.
-   * @param buffer combined buffer.
+   * Read the data from S3 for the bigger combined file range and update all 
the
+   * underlying ranges.
+   * @param combinedFileRange big combined file range.
+   * @param allocate method to create byte buffers to hold result data.
    */
   private void readCombinedRangeAndUpdateChildren(CombinedFileRange 
combinedFileRange,
-                                                  ByteBuffer buffer) {
-    // Not putting read single range call inside try block as
-    // exception if any occurred during this call will be raised
-    // during awaitFuture call while getting the combined buffer.
-    readSingleRange(combinedFileRange, buffer);
+                                                  IntFunction<ByteBuffer> 
allocate) {
+    LOG.debug("Start reading combined range {} from path {} ", 
combinedFileRange, pathStr);
+    // This reference is must be kept till all buffers are populated as this 
is a
+    // finalizable object which closes the internal stream when gc triggers.
+    S3Object objectRange = null;
+    S3ObjectInputStream objectContent = null;
     try {
-      // In case of single range we return the original byte buffer else
-      // we return slice byte buffers for each child ranges.
-      ByteBuffer combinedBuffer = 
FutureIOSupport.awaitFuture(combinedFileRange.getData());
-      if (combinedFileRange.getUnderlying().size() == 1) {
-        
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
-      } else {
-        for (FileRange child : combinedFileRange.getUnderlying()) {
-          updateOriginalRange(child, combinedBuffer, combinedFileRange);
-        }
+      checkIfVectoredIOStopped();
+      final String operationName = "readCombinedFileRange";
+      objectRange = getS3Object(operationName,
+              combinedFileRange.getOffset(),
+              combinedFileRange.getLength());
+      objectContent = objectRange.getObjectContent();
+      if (objectContent == null) {
+        throw new PathIOException(uri,
+                "Null IO stream received during " + operationName);
       }
+      populateChildBuffers(combinedFileRange, objectContent, allocate);
     } catch (Exception ex) {
-      LOG.warn("Exception occurred while reading combined range from file {}", 
pathStr, ex);
+      LOG.warn("Exception while reading a range {} from path {} ", 
combinedFileRange, pathStr, ex);

Review Comment:
   Okay changing to debug. 
   Or do you think it is better to separate EOF exception in a different catch 
clause?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 783103)
    Time Spent: 3h 10m  (was: 3h)

> Handle memory fragmentation in S3 Vectored IO implementation.
> -------------------------------------------------------------
>
>                 Key: HADOOP-18106
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18106
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>            Reporter: Mukund Thakur
>            Assignee: Mukund Thakur
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> As we have implemented merging of ranges in the S3AInputStream implementation 
> of vectored IO api, it can lead to memory fragmentation. Let me explain by 
> example.
>  
> Suppose client requests for 3 ranges. 
> 0-500, 700-1000 and 1200-1500.
> Now because of merging, all the above ranges will get merged into one and we 
> will allocate a big byte buffer of 0-1500 size but return sliced byte buffers 
> for the desired ranges.
> Now once the client is done reading all the ranges, it will only be able to 
> free the memory for requested ranges and memory of the gaps will never be 
> released for eg here (500-700 and 1000-1200).
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to