[
https://issues.apache.org/jira/browse/HADOOP-18221?focusedWorklogId=771857&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-771857
]
ASF GitHub Bot logged work on HADOOP-18221:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/May/22 12:41
Start Date: 18/May/22 12:41
Worklog Time Spent: 10m
Work Description: steveloughran commented on code in PR #4294:
URL: https://github.com/apache/hadoop/pull/4294#discussion_r875834883
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +223,40 @@ void close(InputStream inputStream) {
this.s3Objects.remove(inputStream);
}
- Io.closeIgnoringIoException(inputStream);
- Io.closeIgnoringIoException(obj);
+ this.futurePool.executeRunnable(new DrainTask(inputStream, obj));
}
+
+
+ /**
+ * Drain task that is submitted to the future pool.
+ */
+ private static class DrainTask implements Runnable {
+
+ private final InputStream inputStream;
+ private final S3Object obj;
+ private long drained;
+
+ DrainTask(InputStream inputStream, S3Object obj) {
+ this.inputStream = inputStream;
+ this.obj = obj;
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ while(this.inputStream.read() >= 0) {
+ drained++;
+ }
+
+ LOG.debug("Drained stream of {} bytes", drained);
+
+ Io.closeIgnoringIoException(this.inputStream);
+ Io.closeIgnoringIoException(this.obj);
+ } catch (Exception e) {
Review Comment:
if this happens then the readl: raised an exception. the stream MUST be
aborted to stop it being returned to the http connection pool, as its
connection is probably broken
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +223,40 @@ void close(InputStream inputStream) {
this.s3Objects.remove(inputStream);
}
- Io.closeIgnoringIoException(inputStream);
- Io.closeIgnoringIoException(obj);
+ this.futurePool.executeRunnable(new DrainTask(inputStream, obj));
}
+
+
+ /**
+ * Drain task that is submitted to the future pool.
+ */
+ private static class DrainTask implements Runnable {
Review Comment:
1. declare final to keep style checker happy
2. i'd prefer to not use Runnable, instead completable futures., look at
drainOrAbortHttpStream() and its use. at which point you can just pass in a
function
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -98,6 +106,7 @@ public S3File(
this.streamStatistics = streamStatistics;
this.changeTracker = changeTracker;
this.s3Objects = new IdentityHashMap<InputStream, S3Object>();
+ this.futurePool = context.getFuturePool();
Review Comment:
if this is only for the drain, given the context is already stored, you can
just get the pool when needed
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +223,40 @@ void close(InputStream inputStream) {
this.s3Objects.remove(inputStream);
}
- Io.closeIgnoringIoException(inputStream);
- Io.closeIgnoringIoException(obj);
+ this.futurePool.executeRunnable(new DrainTask(inputStream, obj));
}
+
+
+ /**
+ * Drain task that is submitted to the future pool.
+ */
+ private static class DrainTask implements Runnable {
+
+ private final InputStream inputStream;
+ private final S3Object obj;
+ private long drained;
+
+ DrainTask(InputStream inputStream, S3Object obj) {
+ this.inputStream = inputStream;
+ this.obj = obj;
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ while(this.inputStream.read() >= 0) {
Review Comment:
look at the changes in hadoop trunk s3a input stream here...it reads into a
buffer for draining, and is marginally faster
Issue Time Tracking
-------------------
Worklog Id: (was: 771857)
Time Spent: 1.5h (was: 1h 20m)
> stream warns Not all bytes were read from the S3ObjectInputStream when closed
> -----------------------------------------------------------------------------
>
> Key: HADOOP-18221
> URL: https://issues.apache.org/jira/browse/HADOOP-18221
> Project: Hadoop Common
> Issue Type: Sub-task
> Reporter: Ahmar Suhail
> Assignee: Ahmar Suhail
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Issue: [https://github.com/aws/aws-sdk-java/issues/1211] has resurfaced in
> the prefetching stream when it is closed before reading for blocks is
> complete. This can be fixed by draining the stream before closingÂ
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]