[
https://issues.apache.org/jira/browse/HADOOP-18231?focusedWorklogId=780016&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-780016
]
ASF GitHub Bot logged work on HADOOP-18231:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 09/Jun/22 15:46
Start Date: 09/Jun/22 15:46
Worklog Time Spent: 10m
Work Description: ahmarsuhail commented on code in PR #4386:
URL: https://github.com/apache/hadoop/pull/4386#discussion_r893675594
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +213,83 @@ void close(InputStream inputStream) {
this.s3Objects.remove(inputStream);
}
+ if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
+ // don't bother with async io.
+ drain(false, "close() operation", numRemainingBytes, obj, inputStream);
+ } else {
+ LOG.debug("initiating asynchronous drain of {} bytes",
numRemainingBytes);
+ // schedule an async drain/abort with references to the fields so they
+ // can be reused
+ client.submit(() -> drain(false, "close() operation", numRemainingBytes,
obj, inputStream));
+ }
+ }
+
+ /**
+ * drain the stream. This method is intended to be
+ * used directly or asynchronously, and measures the
+ * duration of the operation in the stream statistics.
+ *
+ * @param shouldAbort force an abort; used if explicitly requested.
+ * @param reason reason for stream being closed; used in messages
+ * @param remaining remaining bytes
+ * @param requestObject http request object;
+ * @param inputStream stream to close.
+ * @return was the stream aborted?
+ */
+ private boolean drain(final boolean shouldAbort, final String reason, final
long remaining,
+ final S3Object requestObject, final InputStream inputStream) {
+
+ try {
+ return
invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
+ () -> drainOrAbortHttpStream(shouldAbort, reason, remaining,
requestObject, inputStream));
+ } catch (IOException e) {
+ // this is only here because invokeTrackingDuration() has it in its
+ // signature
+ return shouldAbort;
+ }
+ }
+
+ /**
+ * Drain or abort the inner stream.
+ * Exceptions are swallowed.
+ * If a close() is attempted and fails, the operation escalates to
+ * an abort.
+ *
+ * @param shouldAbort force an abort; used if explicitly requested.
+ * @param reason reason for stream being closed; used in messages
+ * @param remaining remaining bytes
+ * @param requestObject http request object
+ * @param inputStream stream to close.
+ * @return was the stream aborted?
+ */
+ private boolean drainOrAbortHttpStream(boolean shouldAbort, final String
reason,
+ final long remaining, final S3Object requestObject, final InputStream
inputStream) {
+
+ if (!shouldAbort && remaining > 0) {
+ try {
+ long drained = 0;
+ byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
+ while (true) {
+ final int count = inputStream.read(buffer);
+ if (count < 0) {
+ // no more data is left
+ break;
+ }
+ drained += count;
+ }
+ LOG.debug("Drained stream of {} bytes", drained);
+ } catch (Exception e) {
+ // exception escalates to an abort
+ LOG.debug("When closing {} stream for {}, will abort the stream", uri,
reason, e);
+ shouldAbort = true;
+ }
+ }
Io.closeIgnoringIoException(inputStream);
- Io.closeIgnoringIoException(obj);
+ Io.closeIgnoringIoException(requestObject);
Review Comment:
This was a new `Io` class added as part of the initial prefetching PR. I've
removed this class, and updated usages with `cleanupWithLogger`
Issue Time Tracking
-------------------
Worklog Id: (was: 780016)
Time Spent: 3h 20m (was: 3h 10m)
> tests in ITestS3AInputStreamPerformance are failing
> ----------------------------------------------------
>
> Key: HADOOP-18231
> URL: https://issues.apache.org/jira/browse/HADOOP-18231
> Project: Hadoop Common
> Issue Type: Sub-task
> Reporter: Ahmar Suhail
> Assignee: Ahmar Suhail
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> The following tests are failing when prefetching is enabled:
> testRandomIORandomPolicy - expects stream to be opened 4 times (once for
> every random read), but prefetching will only open twice.
> testDecompressionSequential128K - expects stream to be opened once, but
> prefetching will open once for each block the file has. landsat file used in
> the test has size 42MB, prefetching block size = 8MB, expected open count is
> 6.
> testReadWithNormalPolicy - same as above.
> testRandomIONormalPolicy - executes random IO, but with a normal policy.
> S3AInputStream will abort the stream and change the policy, prefetching
> handles random IO by caching blocks so doesn't do any of that.
> testRandomReadOverBuffer - multiple assertions failing here, also depends a
> lot on readAhead values, not very relevant for prefetching
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]