[
https://issues.apache.org/jira/browse/HADOOP-18106?focusedWorklogId=783103&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783103
]
ASF GitHub Bot logged work on HADOOP-18106:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jun/22 17:46
Start Date: 20/Jun/22 17:46
Worklog Time Spent: 10m
Work Description: mukund-thakur commented on code in PR #4445:
URL: https://github.com/apache/hadoop/pull/4445#discussion_r901891617
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,135 @@ public void readVectored(List<? extends FileRange>
ranges,
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr,
ranges);
checkNotClosed();
+ if (stopVectoredIOOperations.getAndSet(false)) {
+ LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+ }
+ List<? extends FileRange> sortedRanges =
validateNonOverlappingAndReturnSortedRanges(ranges);
for (FileRange range : ranges) {
validateRangeRequest(range);
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
range.setData(result);
}
- if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+ if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint");
- for(FileRange range: ranges) {
+ for (FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
}
} else {
LOG.debug("Trying to merge the ranges as they are not disjoint");
- List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+ List<CombinedFileRange> combinedFileRanges =
mergeSortedRanges(sortedRanges,
1, minSeekForVectorReads(),
maxReadSizeForVectorReads());
LOG.debug("Number of original ranges size {} , Number of combined ranges
{} ",
ranges.size(), combinedFileRanges.size());
- for(CombinedFileRange combinedFileRange: combinedFileRanges) {
- CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
- ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
- combinedFileRange.setData(result);
+ for (CombinedFileRange combinedFileRange: combinedFileRanges) {
unboundedThreadPool.submit(
- () -> readCombinedRangeAndUpdateChildren(combinedFileRange,
buffer));
+ () -> readCombinedRangeAndUpdateChildren(combinedFileRange,
allocate));
}
}
LOG.debug("Finished submitting vectored read to threadpool" +
" on path {} for ranges {} ", pathStr, ranges);
}
/**
- * Read data in the combinedFileRange and update data in buffers
- * of all underlying ranges.
- * @param combinedFileRange combined range.
- * @param buffer combined buffer.
+ * Read the data from S3 for the bigger combined file range and update all
the
+ * underlying ranges.
+ * @param combinedFileRange big combined file range.
+ * @param allocate method to create byte buffers to hold result data.
*/
private void readCombinedRangeAndUpdateChildren(CombinedFileRange
combinedFileRange,
- ByteBuffer buffer) {
- // Not putting read single range call inside try block as
- // exception if any occurred during this call will be raised
- // during awaitFuture call while getting the combined buffer.
- readSingleRange(combinedFileRange, buffer);
+ IntFunction<ByteBuffer>
allocate) {
+ LOG.debug("Start reading combined range {} from path {} ",
combinedFileRange, pathStr);
+ // This reference is must be kept till all buffers are populated as this
is a
+ // finalizable object which closes the internal stream when gc triggers.
+ S3Object objectRange = null;
+ S3ObjectInputStream objectContent = null;
try {
- // In case of single range we return the original byte buffer else
- // we return slice byte buffers for each child ranges.
- ByteBuffer combinedBuffer =
FutureIOSupport.awaitFuture(combinedFileRange.getData());
- if (combinedFileRange.getUnderlying().size() == 1) {
-
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
- } else {
- for (FileRange child : combinedFileRange.getUnderlying()) {
- updateOriginalRange(child, combinedBuffer, combinedFileRange);
- }
+ checkIfVectoredIOStopped();
+ final String operationName = "readCombinedFileRange";
+ objectRange = getS3Object(operationName,
+ combinedFileRange.getOffset(),
+ combinedFileRange.getLength());
+ objectContent = objectRange.getObjectContent();
+ if (objectContent == null) {
+ throw new PathIOException(uri,
+ "Null IO stream received during " + operationName);
}
+ populateChildBuffers(combinedFileRange, objectContent, allocate);
} catch (Exception ex) {
- LOG.warn("Exception occurred while reading combined range from file {}",
pathStr, ex);
+ LOG.warn("Exception while reading a range {} from path {} ",
combinedFileRange, pathStr, ex);
Review Comment:
Okay changing to debug.
Or do you think it is better to separate EOF exception in a different catch
clause?
Issue Time Tracking
-------------------
Worklog Id: (was: 783103)
Time Spent: 3h 10m (was: 3h)
> Handle memory fragmentation in S3 Vectored IO implementation.
> -------------------------------------------------------------
>
> Key: HADOOP-18106
> URL: https://issues.apache.org/jira/browse/HADOOP-18106
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Reporter: Mukund Thakur
> Assignee: Mukund Thakur
> Priority: Major
> Labels: pull-request-available
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> As we have implemented merging of ranges in the S3AInputStream implementation
> of vectored IO api, it can lead to memory fragmentation. Let me explain by
> example.
>
> Suppose client requests for 3 ranges.
> 0-500, 700-1000 and 1200-1500.
> Now because of merging, all the above ranges will get merged into one and we
> will allocate a big byte buffer of 0-1500 size but return sliced byte buffers
> for the desired ranges.
> Now once the client is done reading all the ranges, it will only be able to
> free the memory for requested ranges and memory of the gaps will never be
> released for eg here (500-700 and 1000-1200).
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]