ahmarsuhail commented on code in PR #4386:
URL: https://github.com/apache/hadoop/pull/4386#discussion_r893675594
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +213,83 @@ void close(InputStream inputStream) {
this.s3Objects.remove(inputStream);
}
+ if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
+ // don't bother with async io.
+ drain(false, "close() operation", numRemainingBytes, obj, inputStream);
+ } else {
+ LOG.debug("initiating asynchronous drain of {} bytes",
numRemainingBytes);
+ // schedule an async drain/abort with references to the fields so they
+ // can be reused
+ client.submit(() -> drain(false, "close() operation", numRemainingBytes,
obj, inputStream));
+ }
+ }
+
+ /**
+ * drain the stream. This method is intended to be
+ * used directly or asynchronously, and measures the
+ * duration of the operation in the stream statistics.
+ *
+ * @param shouldAbort force an abort; used if explicitly requested.
+ * @param reason reason for stream being closed; used in messages
+ * @param remaining remaining bytes
+ * @param requestObject http request object;
+ * @param inputStream stream to close.
+ * @return was the stream aborted?
+ */
+ private boolean drain(final boolean shouldAbort, final String reason, final
long remaining,
+ final S3Object requestObject, final InputStream inputStream) {
+
+ try {
+ return
invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
+ () -> drainOrAbortHttpStream(shouldAbort, reason, remaining,
requestObject, inputStream));
+ } catch (IOException e) {
+ // this is only here because invokeTrackingDuration() has it in its
+ // signature
+ return shouldAbort;
+ }
+ }
+
+ /**
+ * Drain or abort the inner stream.
+ * Exceptions are swallowed.
+ * If a close() is attempted and fails, the operation escalates to
+ * an abort.
+ *
+ * @param shouldAbort force an abort; used if explicitly requested.
+ * @param reason reason for stream being closed; used in messages
+ * @param remaining remaining bytes
+ * @param requestObject http request object
+ * @param inputStream stream to close.
+ * @return was the stream aborted?
+ */
+ private boolean drainOrAbortHttpStream(boolean shouldAbort, final String
reason,
+ final long remaining, final S3Object requestObject, final InputStream
inputStream) {
+
+ if (!shouldAbort && remaining > 0) {
+ try {
+ long drained = 0;
+ byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
+ while (true) {
+ final int count = inputStream.read(buffer);
+ if (count < 0) {
+ // no more data is left
+ break;
+ }
+ drained += count;
+ }
+ LOG.debug("Drained stream of {} bytes", drained);
+ } catch (Exception e) {
+ // exception escalates to an abort
+ LOG.debug("When closing {} stream for {}, will abort the stream", uri,
reason, e);
+ shouldAbort = true;
+ }
+ }
Io.closeIgnoringIoException(inputStream);
- Io.closeIgnoringIoException(obj);
+ Io.closeIgnoringIoException(requestObject);
Review Comment:
This was a new `Io` class added as part of the initial prefetching PR. I've
removed this class, and updated usages with `cleanupWithLogger`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]