This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit f30653bcb1c3f125d4d097c91556c466cdb3e9c0
Author: Murtadha Hubail <mhub...@apache.org>
AuthorDate: Tue Mar 26 00:37:27 2024 +0300

    [ASTERIXDB-3368][EXT] Abort S3 streams before closing
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - Abort the S3 stream to avoid fully consuming the stream
      in cases where we would like to close the stream early.
    
    Change-Id: I5e85ab19734f417e6a38b522db5298534951687e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18207
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Michael Blow <mb...@apache.org>
    Integration-Tests: Michael Blow <mb...@apache.org>
---
 .../external/input/record/reader/aws/AwsS3InputStream.java        | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index f14af53bc7..6386813e2b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -38,6 +38,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.util.LogRedactionUtil;
 
+import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -49,6 +50,7 @@ public class AwsS3InputStream extends 
AbstractExternalInputStream {
     // Configuration
     private final String bucket;
     private final S3Client s3Client;
+    private ResponseInputStream<?> s3InStream;
     private static final int MAX_RETRIES = 5; // We will retry 5 times in case 
of internal error from AWS S3 service
 
     public AwsS3InputStream(Map<String, String> configuration, List<String> 
filePaths) throws HyracksDataException {
@@ -83,7 +85,8 @@ public class AwsS3InputStream extends 
AbstractExternalInputStream {
         int retries = 0;
         while (retries < MAX_RETRIES) {
             try {
-                in = s3Client.getObject(request);
+                s3InStream = s3Client.getObject(request);
+                in = s3InStream;
                 break;
             } catch (NoSuchKeyException ex) {
                 LOGGER.debug(() -> "Key " + 
LogRedactionUtil.userData(request.key()) + " was not found in bucket "
@@ -115,6 +118,9 @@ public class AwsS3InputStream extends 
AbstractExternalInputStream {
     @Override
     public void close() throws IOException {
         if (in != null) {
+            if (s3InStream != null) {
+                s3InStream.abort();
+            }
             CleanupUtils.close(in, null);
         }
         if (s3Client != null) {

Reply via email to