[ https://issues.apache.org/jira/browse/HADOOP-18106?focusedWorklogId=783103&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783103 ]
ASF GitHub Bot logged work on HADOOP-18106: ------------------------------------------- Author: ASF GitHub Bot Created on: 20/Jun/22 17:46 Start Date: 20/Jun/22 17:46 Worklog Time Spent: 10m Work Description: mukund-thakur commented on code in PR #4445: URL: https://github.com/apache/hadoop/pull/4445#discussion_r901891617 ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java: ########## @@ -940,90 +949,135 @@ public void readVectored(List<? extends FileRange> ranges, LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); checkNotClosed(); + if (stopVectoredIOOperations.getAndSet(false)) { + LOG.debug("Reinstating vectored read operation for path {} ", pathStr); + } + List<? extends FileRange> sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges); for (FileRange range : ranges) { validateRangeRequest(range); CompletableFuture<ByteBuffer> result = new CompletableFuture<>(); range.setData(result); } - if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) { + if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) { LOG.debug("Not merging the ranges as they are disjoint"); - for(FileRange range: ranges) { + for (FileRange range: sortedRanges) { ByteBuffer buffer = allocate.apply(range.getLength()); unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); } } else { LOG.debug("Trying to merge the ranges as they are not disjoint"); - List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges, + List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges, 1, minSeekForVectorReads(), maxReadSizeForVectorReads()); LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", ranges.size(), combinedFileRanges.size()); - for(CombinedFileRange combinedFileRange: combinedFileRanges) { - CompletableFuture<ByteBuffer> result = new CompletableFuture<>(); - ByteBuffer buffer = allocate.apply(combinedFileRange.getLength()); - combinedFileRange.setData(result); + for (CombinedFileRange combinedFileRange: combinedFileRanges) { unboundedThreadPool.submit( - () -> readCombinedRangeAndUpdateChildren(combinedFileRange, buffer)); + () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate)); } } LOG.debug("Finished submitting vectored read to threadpool" + " on path {} for ranges {} ", pathStr, ranges); } /** - * Read data in the combinedFileRange and update data in buffers - * of all underlying ranges. - * @param combinedFileRange combined range. - * @param buffer combined buffer. + * Read the data from S3 for the bigger combined file range and update all the + * underlying ranges. + * @param combinedFileRange big combined file range. + * @param allocate method to create byte buffers to hold result data. */ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, - ByteBuffer buffer) { - // Not putting read single range call inside try block as - // exception if any occurred during this call will be raised - // during awaitFuture call while getting the combined buffer. - readSingleRange(combinedFileRange, buffer); + IntFunction<ByteBuffer> allocate) { + LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr); + // This reference is must be kept till all buffers are populated as this is a + // finalizable object which closes the internal stream when gc triggers. + S3Object objectRange = null; + S3ObjectInputStream objectContent = null; try { - // In case of single range we return the original byte buffer else - // we return slice byte buffers for each child ranges. - ByteBuffer combinedBuffer = FutureIOSupport.awaitFuture(combinedFileRange.getData()); - if (combinedFileRange.getUnderlying().size() == 1) { - combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer); - } else { - for (FileRange child : combinedFileRange.getUnderlying()) { - updateOriginalRange(child, combinedBuffer, combinedFileRange); - } + checkIfVectoredIOStopped(); + final String operationName = "readCombinedFileRange"; + objectRange = getS3Object(operationName, + combinedFileRange.getOffset(), + combinedFileRange.getLength()); + objectContent = objectRange.getObjectContent(); + if (objectContent == null) { + throw new PathIOException(uri, + "Null IO stream received during " + operationName); } + populateChildBuffers(combinedFileRange, objectContent, allocate); } catch (Exception ex) { - LOG.warn("Exception occurred while reading combined range from file {}", pathStr, ex); + LOG.warn("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex); Review Comment: Okay changing to debug. Or do you think it is better to separate EOF exception in a different catch clause? Issue Time Tracking ------------------- Worklog Id: (was: 783103) Time Spent: 3h 10m (was: 3h) > Handle memory fragmentation in S3 Vectored IO implementation. > ------------------------------------------------------------- > > Key: HADOOP-18106 > URL: https://issues.apache.org/jira/browse/HADOOP-18106 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Reporter: Mukund Thakur > Assignee: Mukund Thakur > Priority: Major > Labels: pull-request-available > Time Spent: 3h 10m > Remaining Estimate: 0h > > As we have implemented merging of ranges in the S3AInputStream implementation > of vectored IO api, it can lead to memory fragmentation. Let me explain by > example. > > Suppose client requests for 3 ranges. > 0-500, 700-1000 and 1200-1500. > Now because of merging, all the above ranges will get merged into one and we > will allocate a big byte buffer of 0-1500 size but return sliced byte buffers > for the desired ranges. > Now once the client is done reading all the ranges, it will only be able to > free the memory for requested ranges and memory of the gaps will never be > released for eg here (500-700 and 1000-1200). > -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org