[ 
https://issues.apache.org/jira/browse/HADOOP-18231?focusedWorklogId=780016&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-780016
 ]

ASF GitHub Bot logged work on HADOOP-18231:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jun/22 15:46
            Start Date: 09/Jun/22 15:46
    Worklog Time Spent: 10m 
      Work Description: ahmarsuhail commented on code in PR #4386:
URL: https://github.com/apache/hadoop/pull/4386#discussion_r893675594


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +213,83 @@ void close(InputStream inputStream) {
       this.s3Objects.remove(inputStream);
     }
 
+    if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
+      // don't bother with async io.
+      drain(false, "close() operation", numRemainingBytes, obj, inputStream);
+    } else {
+      LOG.debug("initiating asynchronous drain of {} bytes", 
numRemainingBytes);
+      // schedule an async drain/abort with references to the fields so they
+      // can be reused
+      client.submit(() -> drain(false, "close() operation", numRemainingBytes, 
obj, inputStream));
+    }
+  }
+
+  /**
+   * drain the stream. This method is intended to be
+   * used directly or asynchronously, and measures the
+   * duration of the operation in the stream statistics.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object;
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drain(final boolean shouldAbort, final String reason, final 
long remaining,
+      final S3Object requestObject, final InputStream inputStream) {
+
+    try {
+      return 
invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
+          () -> drainOrAbortHttpStream(shouldAbort, reason, remaining, 
requestObject, inputStream));
+    } catch (IOException e) {
+      // this is only here because invokeTrackingDuration() has it in its
+      // signature
+      return shouldAbort;
+    }
+  }
+
+  /**
+   * Drain or abort the inner stream.
+   * Exceptions are swallowed.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drainOrAbortHttpStream(boolean shouldAbort, final String 
reason,
+      final long remaining, final S3Object requestObject, final InputStream 
inputStream) {
+
+    if (!shouldAbort && remaining > 0) {
+      try {
+        long drained = 0;
+        byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
+        while (true) {
+          final int count = inputStream.read(buffer);
+          if (count < 0) {
+            // no more data is left
+            break;
+          }
+          drained += count;
+        }
+        LOG.debug("Drained stream of {} bytes", drained);
+      } catch (Exception e) {
+        // exception escalates to an abort
+        LOG.debug("When closing {} stream for {}, will abort the stream", uri, 
reason, e);
+        shouldAbort = true;
+      }
+    }
     Io.closeIgnoringIoException(inputStream);
-    Io.closeIgnoringIoException(obj);
+    Io.closeIgnoringIoException(requestObject);

Review Comment:
   This was a new `Io` class added as part of the initial prefetching PR. I've 
removed this class, and updated usages with `cleanupWithLogger`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 780016)
    Time Spent: 3h 20m  (was: 3h 10m)

> tests in ITestS3AInputStreamPerformance are failing 
> ----------------------------------------------------
>
>                 Key: HADOOP-18231
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18231
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Ahmar Suhail
>            Assignee: Ahmar Suhail
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> The following tests are failing when prefetching is enabled:
> testRandomIORandomPolicy - expects stream to be opened 4 times (once for 
> every random read), but prefetching will only open twice. 
> testDecompressionSequential128K - expects stream to be opened once, but 
> prefetching will open once for each block the file has. landsat file used in 
> the test has size 42MB, prefetching block size = 8MB, expected open count is 
> 6.
>  testReadWithNormalPolicy - same as above. 
> testRandomIONormalPolicy - executes random IO, but with a normal policy. 
> S3AInputStream will abort the stream and change the policy, prefetching 
> handles random IO by caching blocks so doesn't do any of that. 
> testRandomReadOverBuffer - multiple assertions failing here, also depends a 
> lot on readAhead values, not very relevant for prefetching



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to