[
https://issues.apache.org/jira/browse/HADOOP-18106?focusedWorklogId=781440&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781440
]
ASF GitHub Bot logged work on HADOOP-18106:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 15/Jun/22 01:39
Start Date: 15/Jun/22 01:39
Worklog Time Spent: 10m
Work Description: mukund-thakur commented on code in PR #4427:
URL: https://github.com/apache/hadoop/pull/4427#discussion_r897448130
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,133 @@ public void readVectored(List<? extends FileRange>
ranges,
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr,
ranges);
checkNotClosed();
+ if (stopVectoredIOOperations.getAndSet(false)) {
+ LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+ }
+ List<? extends FileRange> sortedRanges =
validateNonOverlappingAndReturnSortedRanges(ranges);
for (FileRange range : ranges) {
validateRangeRequest(range);
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
range.setData(result);
}
- if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+ if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint");
- for(FileRange range: ranges) {
+ for(FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
}
} else {
LOG.debug("Trying to merge the ranges as they are not disjoint");
- List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+ List<CombinedFileRange> combinedFileRanges =
mergeSortedRanges(sortedRanges,
1, minSeekForVectorReads(),
maxReadSizeForVectorReads());
LOG.debug("Number of original ranges size {} , Number of combined ranges
{} ",
ranges.size(), combinedFileRanges.size());
for(CombinedFileRange combinedFileRange: combinedFileRanges) {
- CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
- ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
- combinedFileRange.setData(result);
unboundedThreadPool.submit(
- () -> readCombinedRangeAndUpdateChildren(combinedFileRange,
buffer));
+ () -> readCombinedRangeAndUpdateChildren(combinedFileRange,
allocate));
}
}
LOG.debug("Finished submitting vectored read to threadpool" +
" on path {} for ranges {} ", pathStr, ranges);
}
/**
- * Read data in the combinedFileRange and update data in buffers
- * of all underlying ranges.
- * @param combinedFileRange combined range.
- * @param buffer combined buffer.
+ * Read the data from S3 for the bigger combined file range and update all
the
+ * underlying ranges.
+ * @param combinedFileRange big combined file range.
+ * @param allocate method to create byte buffers to hold result data.
*/
private void readCombinedRangeAndUpdateChildren(CombinedFileRange
combinedFileRange,
- ByteBuffer buffer) {
- // Not putting read single range call inside try block as
- // exception if any occurred during this call will be raised
- // during awaitFuture call while getting the combined buffer.
- readSingleRange(combinedFileRange, buffer);
+ IntFunction<ByteBuffer>
allocate) {
+ LOG.debug("Start reading combined range {} from path {} ",
combinedFileRange, pathStr);
+ S3Object objectRange = null;
+ S3ObjectInputStream objectContent = null;
try {
- // In case of single range we return the original byte buffer else
- // we return slice byte buffers for each child ranges.
- ByteBuffer combinedBuffer =
FutureIOSupport.awaitFuture(combinedFileRange.getData());
- if (combinedFileRange.getUnderlying().size() == 1) {
-
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
- } else {
- for (FileRange child : combinedFileRange.getUnderlying()) {
- updateOriginalRange(child, combinedBuffer, combinedFileRange);
- }
+ checkIfVectoredIOStopped();
+ final String operationName = "readCombinedFileRange";
+ objectRange = getS3Object(operationName,
+ combinedFileRange.getOffset(),
+ combinedFileRange.getLength());
+ objectContent = objectRange.getObjectContent();
+ if (objectContent == null) {
+ throw new PathIOException(uri,
+ "Null IO stream received during " + operationName);
}
+ populateChildBuffers(combinedFileRange, objectContent, allocate);
} catch (Exception ex) {
- LOG.warn("Exception occurred while reading combined range from file {}",
pathStr, ex);
+ LOG.warn("Exception while reading a range {} from path {} ",
combinedFileRange, pathStr, ex);
for(FileRange child : combinedFileRange.getUnderlying()) {
child.getData().completeExceptionally(ex);
}
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
+ }
+ LOG.debug("Finished reading range {} from path {} ", combinedFileRange,
pathStr);
+ }
+
+ /**
+ * Populate underlying buffers of the child ranges.
+ * @param combinedFileRange big combined file range.
+ * @param objectContent data from s3.
+ * @param allocate method to allocate child byte buffers.
+ * @throws IOException any IOE.
+ */
+ private void populateChildBuffers(CombinedFileRange combinedFileRange,
+ S3ObjectInputStream objectContent,
+ IntFunction<ByteBuffer> allocate) throws
IOException {
+ // If the combined file range just contains a single child
+ // range, we only have to fill that one child buffer else
+ // we drain the intermediate data between consecutive ranges
+ // and fill the buffers one by one.
+ if (combinedFileRange.getUnderlying().size() == 1) {
+ FileRange child = combinedFileRange.getUnderlying().get(0);
+ ByteBuffer buffer = allocate.apply(child.getLength());
+ populateBuffer(child.getLength(), buffer, objectContent);
+ child.getData().complete(buffer);
+ } else {
+ FileRange prev = null;
+ for (FileRange child : combinedFileRange.getUnderlying()) {
+ if (prev != null && prev.getOffset() + prev.getLength() <
child.getOffset()) {
+ long drainQuantity = child.getOffset() - prev.getOffset() -
prev.getLength();
+ drainUnnecessaryData(objectContent, drainQuantity);
+ }
+ ByteBuffer buffer = allocate.apply(child.getLength());
+ populateBuffer(child.getLength(), buffer, objectContent);
+ child.getData().complete(buffer);
+ prev = child;
+ }
}
}
/**
- * Update data in child range from combined range.
- * @param child child range.
- * @param combinedBuffer combined buffer.
- * @param combinedFileRange combined range.
+ * Drain unnecessary data in between ranges.
+ * @param objectContent s3 data stream.
+ * @param drainQuantity how many bytes to drain.
+ * @throws IOException any IOE.
*/
- private void updateOriginalRange(FileRange child,
- ByteBuffer combinedBuffer,
- CombinedFileRange combinedFileRange) {
- LOG.trace("Start Filling original range [{}, {}) from combined range [{},
{}) ",
- child.getOffset(), child.getLength(),
- combinedFileRange.getOffset(), combinedFileRange.getLength());
- ByteBuffer childBuffer = sliceTo(combinedBuffer,
combinedFileRange.getOffset(), child);
- child.getData().complete(childBuffer);
- LOG.trace("End Filling original range [{}, {}) from combined range [{},
{}) ",
- child.getOffset(), child.getLength(),
- combinedFileRange.getOffset(), combinedFileRange.getLength());
+ private void drainUnnecessaryData(S3ObjectInputStream objectContent, long
drainQuantity)
Review Comment:
Not right now. I plan to add IoStats, auditing and VEC IO read policy later
directly in trunk. Don't want the feature branch to super long lived as
discussed.
Issue Time Tracking
-------------------
Worklog Id: (was: 781440)
Time Spent: 1.5h (was: 1h 20m)
> Handle memory fragmentation in S3 Vectored IO implementation.
> -------------------------------------------------------------
>
> Key: HADOOP-18106
> URL: https://issues.apache.org/jira/browse/HADOOP-18106
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Reporter: Mukund Thakur
> Assignee: Mukund Thakur
> Priority: Major
> Labels: pull-request-available
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> As we have implemented merging of ranges in the S3AInputStream implementation
> of vectored IO api, it can lead to memory fragmentation. Let me explain by
> example.
>
> Suppose client requests for 3 ranges.
> 0-500, 700-1000 and 1200-1500.
> Now because of merging, all the above ranges will get merged into one and we
> will allocate a big byte buffer of 0-1500 size but return sliced byte buffers
> for the desired ranges.
> Now once the client is done reading all the ranges, it will only be able to
> free the memory for requested ranges and memory of the gaps will never be
> released for eg here (500-700 and 1000-1200).
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]