[ 
https://issues.apache.org/jira/browse/HADOOP-18106?focusedWorklogId=781440&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781440
 ]

ASF GitHub Bot logged work on HADOOP-18106:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jun/22 01:39
            Start Date: 15/Jun/22 01:39
    Worklog Time Spent: 10m 
      Work Description: mukund-thakur commented on code in PR #4427:
URL: https://github.com/apache/hadoop/pull/4427#discussion_r897448130


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,133 @@ public void readVectored(List<? extends FileRange> 
ranges,
 
     LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, 
ranges);
     checkNotClosed();
+    if (stopVectoredIOOperations.getAndSet(false)) {
+      LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+    }
+    List<? extends FileRange> sortedRanges = 
validateNonOverlappingAndReturnSortedRanges(ranges);
     for (FileRange range : ranges) {
       validateRangeRequest(range);
       CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
       range.setData(result);
     }
 
-    if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+    if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
       LOG.debug("Not merging the ranges as they are disjoint");
-      for(FileRange range: ranges) {
+      for(FileRange range: sortedRanges) {
         ByteBuffer buffer = allocate.apply(range.getLength());
         unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
       }
     } else {
       LOG.debug("Trying to merge the ranges as they are not disjoint");
-      List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+      List<CombinedFileRange> combinedFileRanges = 
mergeSortedRanges(sortedRanges,
               1, minSeekForVectorReads(),
               maxReadSizeForVectorReads());
       LOG.debug("Number of original ranges size {} , Number of combined ranges 
{} ",
               ranges.size(), combinedFileRanges.size());
       for(CombinedFileRange combinedFileRange: combinedFileRanges) {
-        CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
-        ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
-        combinedFileRange.setData(result);
         unboundedThreadPool.submit(
-            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
buffer));
+            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
allocate));
       }
     }
     LOG.debug("Finished submitting vectored read to threadpool" +
             " on path {} for ranges {} ", pathStr, ranges);
   }
 
   /**
-   * Read data in the combinedFileRange and update data in buffers
-   * of all underlying ranges.
-   * @param combinedFileRange combined range.
-   * @param buffer combined buffer.
+   * Read the data from S3 for the bigger combined file range and update all 
the
+   * underlying ranges.
+   * @param combinedFileRange big combined file range.
+   * @param allocate method to create byte buffers to hold result data.
    */
   private void readCombinedRangeAndUpdateChildren(CombinedFileRange 
combinedFileRange,
-                                                  ByteBuffer buffer) {
-    // Not putting read single range call inside try block as
-    // exception if any occurred during this call will be raised
-    // during awaitFuture call while getting the combined buffer.
-    readSingleRange(combinedFileRange, buffer);
+                                                  IntFunction<ByteBuffer> 
allocate) {
+    LOG.debug("Start reading combined range {} from path {} ", 
combinedFileRange, pathStr);
+    S3Object objectRange = null;
+    S3ObjectInputStream objectContent = null;
     try {
-      // In case of single range we return the original byte buffer else
-      // we return slice byte buffers for each child ranges.
-      ByteBuffer combinedBuffer = 
FutureIOSupport.awaitFuture(combinedFileRange.getData());
-      if (combinedFileRange.getUnderlying().size() == 1) {
-        
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
-      } else {
-        for (FileRange child : combinedFileRange.getUnderlying()) {
-          updateOriginalRange(child, combinedBuffer, combinedFileRange);
-        }
+      checkIfVectoredIOStopped();
+      final String operationName = "readCombinedFileRange";
+      objectRange = getS3Object(operationName,
+              combinedFileRange.getOffset(),
+              combinedFileRange.getLength());
+      objectContent = objectRange.getObjectContent();
+      if (objectContent == null) {
+        throw new PathIOException(uri,
+                "Null IO stream received during " + operationName);
       }
+      populateChildBuffers(combinedFileRange, objectContent, allocate);
     } catch (Exception ex) {
-      LOG.warn("Exception occurred while reading combined range from file {}", 
pathStr, ex);
+      LOG.warn("Exception while reading a range {} from path {} ", 
combinedFileRange, pathStr, ex);
       for(FileRange child : combinedFileRange.getUnderlying()) {
         child.getData().completeExceptionally(ex);
       }
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
+    }
+    LOG.debug("Finished reading range {} from path {} ", combinedFileRange, 
pathStr);
+  }
+
+  /**
+   * Populate underlying buffers of the child ranges.
+   * @param combinedFileRange big combined file range.
+   * @param objectContent data from s3.
+   * @param allocate method to allocate child byte buffers.
+   * @throws IOException any IOE.
+   */
+  private void populateChildBuffers(CombinedFileRange combinedFileRange,
+                                    S3ObjectInputStream objectContent,
+                                    IntFunction<ByteBuffer> allocate) throws 
IOException {
+    // If the combined file range just contains a single child
+    // range, we only have to fill that one child buffer else
+    // we drain the intermediate data between consecutive ranges
+    // and fill the buffers one by one.
+    if (combinedFileRange.getUnderlying().size() == 1) {
+      FileRange child = combinedFileRange.getUnderlying().get(0);
+      ByteBuffer buffer = allocate.apply(child.getLength());
+      populateBuffer(child.getLength(), buffer, objectContent);
+      child.getData().complete(buffer);
+    } else {
+      FileRange prev = null;
+      for (FileRange child : combinedFileRange.getUnderlying()) {
+        if (prev != null && prev.getOffset() + prev.getLength() < 
child.getOffset()) {
+          long drainQuantity = child.getOffset() - prev.getOffset() - 
prev.getLength();
+          drainUnnecessaryData(objectContent, drainQuantity);
+        }
+        ByteBuffer buffer = allocate.apply(child.getLength());
+        populateBuffer(child.getLength(), buffer, objectContent);
+        child.getData().complete(buffer);
+        prev = child;
+      }
     }
   }
 
   /**
-   * Update data in child range from combined range.
-   * @param child child range.
-   * @param combinedBuffer combined buffer.
-   * @param combinedFileRange combined range.
+   * Drain unnecessary data in between ranges.
+   * @param objectContent s3 data stream.
+   * @param drainQuantity how many bytes to drain.
+   * @throws IOException any IOE.
    */
-  private void updateOriginalRange(FileRange child,
-                                   ByteBuffer combinedBuffer,
-                                   CombinedFileRange combinedFileRange) {
-    LOG.trace("Start Filling original range [{}, {}) from combined range [{}, 
{}) ",
-            child.getOffset(), child.getLength(),
-            combinedFileRange.getOffset(), combinedFileRange.getLength());
-    ByteBuffer childBuffer = sliceTo(combinedBuffer, 
combinedFileRange.getOffset(), child);
-    child.getData().complete(childBuffer);
-    LOG.trace("End Filling original range [{}, {}) from combined range [{}, 
{}) ",
-            child.getOffset(), child.getLength(),
-            combinedFileRange.getOffset(), combinedFileRange.getLength());
+  private void drainUnnecessaryData(S3ObjectInputStream objectContent, long 
drainQuantity)

Review Comment:
   Not right now. I plan to add IoStats, auditing and VEC IO read policy later 
directly in trunk. Don't want the feature branch to super long lived as 
discussed.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 781440)
    Time Spent: 1.5h  (was: 1h 20m)

> Handle memory fragmentation in S3 Vectored IO implementation.
> -------------------------------------------------------------
>
>                 Key: HADOOP-18106
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18106
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>            Reporter: Mukund Thakur
>            Assignee: Mukund Thakur
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> As we have implemented merging of ranges in the S3AInputStream implementation 
> of vectored IO api, it can lead to memory fragmentation. Let me explain by 
> example.
>  
> Suppose client requests for 3 ranges. 
> 0-500, 700-1000 and 1200-1500.
> Now because of merging, all the above ranges will get merged into one and we 
> will allocate a big byte buffer of 0-1500 size but return sliced byte buffers 
> for the desired ranges.
> Now once the client is done reading all the ranges, it will only be able to 
> free the memory for requested ranges and memory of the gaps will never be 
> released for eg here (500-700 and 1000-1200).
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to