mukund-thakur commented on code in PR #4427:
URL: https://github.com/apache/hadoop/pull/4427#discussion_r897448130
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,133 @@ public void readVectored(List<? extends FileRange>
ranges,
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr,
ranges);
checkNotClosed();
+ if (stopVectoredIOOperations.getAndSet(false)) {
+ LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+ }
+ List<? extends FileRange> sortedRanges =
validateNonOverlappingAndReturnSortedRanges(ranges);
for (FileRange range : ranges) {
validateRangeRequest(range);
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
range.setData(result);
}
- if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+ if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint");
- for(FileRange range: ranges) {
+ for(FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
}
} else {
LOG.debug("Trying to merge the ranges as they are not disjoint");
- List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+ List<CombinedFileRange> combinedFileRanges =
mergeSortedRanges(sortedRanges,
1, minSeekForVectorReads(),
maxReadSizeForVectorReads());
LOG.debug("Number of original ranges size {} , Number of combined ranges
{} ",
ranges.size(), combinedFileRanges.size());
for(CombinedFileRange combinedFileRange: combinedFileRanges) {
- CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
- ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
- combinedFileRange.setData(result);
unboundedThreadPool.submit(
- () -> readCombinedRangeAndUpdateChildren(combinedFileRange,
buffer));
+ () -> readCombinedRangeAndUpdateChildren(combinedFileRange,
allocate));
}
}
LOG.debug("Finished submitting vectored read to threadpool" +
" on path {} for ranges {} ", pathStr, ranges);
}
/**
- * Read data in the combinedFileRange and update data in buffers
- * of all underlying ranges.
- * @param combinedFileRange combined range.
- * @param buffer combined buffer.
+ * Read the data from S3 for the bigger combined file range and update all
the
+ * underlying ranges.
+ * @param combinedFileRange big combined file range.
+ * @param allocate method to create byte buffers to hold result data.
*/
private void readCombinedRangeAndUpdateChildren(CombinedFileRange
combinedFileRange,
- ByteBuffer buffer) {
- // Not putting read single range call inside try block as
- // exception if any occurred during this call will be raised
- // during awaitFuture call while getting the combined buffer.
- readSingleRange(combinedFileRange, buffer);
+ IntFunction<ByteBuffer>
allocate) {
+ LOG.debug("Start reading combined range {} from path {} ",
combinedFileRange, pathStr);
+ S3Object objectRange = null;
+ S3ObjectInputStream objectContent = null;
try {
- // In case of single range we return the original byte buffer else
- // we return slice byte buffers for each child ranges.
- ByteBuffer combinedBuffer =
FutureIOSupport.awaitFuture(combinedFileRange.getData());
- if (combinedFileRange.getUnderlying().size() == 1) {
-
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
- } else {
- for (FileRange child : combinedFileRange.getUnderlying()) {
- updateOriginalRange(child, combinedBuffer, combinedFileRange);
- }
+ checkIfVectoredIOStopped();
+ final String operationName = "readCombinedFileRange";
+ objectRange = getS3Object(operationName,
+ combinedFileRange.getOffset(),
+ combinedFileRange.getLength());
+ objectContent = objectRange.getObjectContent();
+ if (objectContent == null) {
+ throw new PathIOException(uri,
+ "Null IO stream received during " + operationName);
}
+ populateChildBuffers(combinedFileRange, objectContent, allocate);
} catch (Exception ex) {
- LOG.warn("Exception occurred while reading combined range from file {}",
pathStr, ex);
+ LOG.warn("Exception while reading a range {} from path {} ",
combinedFileRange, pathStr, ex);
for(FileRange child : combinedFileRange.getUnderlying()) {
child.getData().completeExceptionally(ex);
}
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
+ }
+ LOG.debug("Finished reading range {} from path {} ", combinedFileRange,
pathStr);
+ }
+
+ /**
+ * Populate underlying buffers of the child ranges.
+ * @param combinedFileRange big combined file range.
+ * @param objectContent data from s3.
+ * @param allocate method to allocate child byte buffers.
+ * @throws IOException any IOE.
+ */
+ private void populateChildBuffers(CombinedFileRange combinedFileRange,
+ S3ObjectInputStream objectContent,
+ IntFunction<ByteBuffer> allocate) throws
IOException {
+ // If the combined file range just contains a single child
+ // range, we only have to fill that one child buffer else
+ // we drain the intermediate data between consecutive ranges
+ // and fill the buffers one by one.
+ if (combinedFileRange.getUnderlying().size() == 1) {
+ FileRange child = combinedFileRange.getUnderlying().get(0);
+ ByteBuffer buffer = allocate.apply(child.getLength());
+ populateBuffer(child.getLength(), buffer, objectContent);
+ child.getData().complete(buffer);
+ } else {
+ FileRange prev = null;
+ for (FileRange child : combinedFileRange.getUnderlying()) {
+ if (prev != null && prev.getOffset() + prev.getLength() <
child.getOffset()) {
+ long drainQuantity = child.getOffset() - prev.getOffset() -
prev.getLength();
+ drainUnnecessaryData(objectContent, drainQuantity);
+ }
+ ByteBuffer buffer = allocate.apply(child.getLength());
+ populateBuffer(child.getLength(), buffer, objectContent);
+ child.getData().complete(buffer);
+ prev = child;
+ }
}
}
/**
- * Update data in child range from combined range.
- * @param child child range.
- * @param combinedBuffer combined buffer.
- * @param combinedFileRange combined range.
+ * Drain unnecessary data in between ranges.
+ * @param objectContent s3 data stream.
+ * @param drainQuantity how many bytes to drain.
+ * @throws IOException any IOE.
*/
- private void updateOriginalRange(FileRange child,
- ByteBuffer combinedBuffer,
- CombinedFileRange combinedFileRange) {
- LOG.trace("Start Filling original range [{}, {}) from combined range [{},
{}) ",
- child.getOffset(), child.getLength(),
- combinedFileRange.getOffset(), combinedFileRange.getLength());
- ByteBuffer childBuffer = sliceTo(combinedBuffer,
combinedFileRange.getOffset(), child);
- child.getData().complete(childBuffer);
- LOG.trace("End Filling original range [{}, {}) from combined range [{},
{}) ",
- child.getOffset(), child.getLength(),
- combinedFileRange.getOffset(), combinedFileRange.getLength());
+ private void drainUnnecessaryData(S3ObjectInputStream objectContent, long
drainQuantity)
Review Comment:
Not right now. I plan to add IoStats, auditing and VEC IO read policy later
directly in trunk. Don't want the feature branch to super long lived as
discussed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]