This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit f30653bcb1c3f125d4d097c91556c466cdb3e9c0 Author: Murtadha Hubail <mhub...@apache.org> AuthorDate: Tue Mar 26 00:37:27 2024 +0300 [ASTERIXDB-3368][EXT] Abort S3 streams before closing - user model changes: no - storage format changes: no - interface changes: no Details: - Abort the S3 stream to avoid fully consuming the stream in cases where we would like to close the stream early. Change-Id: I5e85ab19734f417e6a38b522db5298534951687e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18207 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Integration-Tests: Michael Blow <mb...@apache.org> --- .../external/input/record/reader/aws/AwsS3InputStream.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java index f14af53bc7..6386813e2b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@ -38,6 +38,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.util.LogRedactionUtil; +import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -49,6 +50,7 @@ public class AwsS3InputStream extends AbstractExternalInputStream { // Configuration private final String bucket; private final S3Client s3Client; + private ResponseInputStream<?> s3InStream; private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException { @@ -83,7 +85,8 @@ public class AwsS3InputStream extends AbstractExternalInputStream { int retries = 0; while (retries < MAX_RETRIES) { try { - in = s3Client.getObject(request); + s3InStream = s3Client.getObject(request); + in = s3InStream; break; } catch (NoSuchKeyException ex) { LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket " @@ -115,6 +118,9 @@ public class AwsS3InputStream extends AbstractExternalInputStream { @Override public void close() throws IOException { if (in != null) { + if (s3InStream != null) { + s3InStream.abort(); + } CleanupUtils.close(in, null); } if (s3Client != null) {