HADOOP-15541. [s3a] Shouldn't try to drain stream before aborting connection in case of timeout. Contributed by Sean Mackrory.
(cherry picked from commit d503f65b6689b19278ec2a0cf9da5a8762539de8) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/caf38532 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/caf38532 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/caf38532 Branch: refs/remotes/origin/branch-3.1 Commit: caf38532f3f3eafb4c874a6debddaad2fb2aa201 Parents: 2aaad40 Author: Steve Loughran <[email protected]> Authored: Wed Jul 11 14:55:11 2018 +0100 Committer: Steve Loughran <[email protected]> Committed: Wed Jul 11 14:55:11 2018 +0100 ---------------------------------------------------------------------- .../apache/hadoop/fs/s3a/S3AInputStream.java | 24 +++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/caf38532/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index c54d3e26..91a2d9d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.io.EOFException; import java.io.IOException; +import java.net.SocketTimeoutException; import static org.apache.commons.lang3.StringUtils.isNotEmpty; @@ -155,11 +156,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { * @throws IOException on any failure to open the object */ @Retries.OnceTranslated - private synchronized void reopen(String reason, long targetPos, long length) - throws IOException { + private synchronized void reopen(String reason, long targetPos, long length, + boolean forceAbort) throws IOException { if (wrappedStream != null) { - closeStream("reopen(" + reason + ")", contentRangeFinish, false); + closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort); } contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos, @@ -324,7 +325,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { //re-open at specific location if needed if (wrappedStream == null) { - reopen("read from new offset", targetPos, len); + reopen("read from new offset", targetPos, len, false); } }); } @@ -367,8 +368,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { b = wrappedStream.read(); } catch (EOFException e) { return -1; + } catch (SocketTimeoutException e) { + onReadFailure(e, 1, true); + b = wrappedStream.read(); } catch (IOException e) { - onReadFailure(e, 1); + onReadFailure(e, 1, false); b = wrappedStream.read(); } return b; @@ -393,12 +397,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { * @throws IOException any exception thrown on the re-open attempt. */ @Retries.OnceTranslated - private void onReadFailure(IOException ioe, int length) throws IOException { + private void onReadFailure(IOException ioe, int length, boolean forceAbort) + throws IOException { LOG.info("Got exception while trying to read from stream {}" + " trying to recover: " + ioe, uri); streamStatistics.readException(); - reopen("failure recovery", pos, length); + reopen("failure recovery", pos, length, forceAbort); } /** @@ -446,8 +451,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { } catch (EOFException e) { // the base implementation swallows EOFs. return -1; + } catch (SocketTimeoutException e) { + onReadFailure(e, len, true); + bytes = wrappedStream.read(buf, off, len); } catch (IOException e) { - onReadFailure(e, len); + onReadFailure(e, len, false); bytes= wrappedStream.read(buf, off, len); } return bytes; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
