This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 3ea8ef15a2aa47fd3d6e87cb19acd5254efa9252
Author: Hussain Towaileb <[email protected]>
AuthorDate: Tue Apr 6 04:00:34 2021 +0300

    [ASTERIXDB-2870][EXT]: Close client after it is used
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Close the S3 client after it is used to release
      the connections.
    
    Change-Id: I8611b5a05fcbd8a4a9a4556c290281fd5cbd56a4
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10885
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Till Westmann
---
 .../input/record/reader/aws/AwsS3InputStream.java    | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index f558820..8bd7a51 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -30,6 +30,7 @@ import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.input.stream.AbstractMultipleInputStream;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.util.LogRedactionUtil;
@@ -46,7 +47,7 @@ public class AwsS3InputStream extends 
AbstractMultipleInputStream {
     private static final Logger LOGGER = LogManager.getLogger();
 
     // Configuration
-    private final Map<String, String> configuration;
+    private final String bucket;
     private final int bufferSize;
 
     private final S3Client s3Client;
@@ -56,10 +57,10 @@ public class AwsS3InputStream extends 
AbstractMultipleInputStream {
     private int nextFileIndex = 0;
 
     public AwsS3InputStream(Map<String, String> configuration, List<String> 
filePaths) throws HyracksDataException {
-        this.configuration = configuration;
         this.filePaths = filePaths;
         this.s3Client = buildAwsS3Client(configuration);
         this.bufferSize = 
ExternalDataUtils.getOrDefaultBufferSize(configuration);
+        this.bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
     }
 
     @Override
@@ -71,9 +72,6 @@ public class AwsS3InputStream extends 
AbstractMultipleInputStream {
 
         // Finished reading all the files
         if (nextFileIndex >= filePaths.size()) {
-            if (in != null) {
-                CleanupUtils.close(in, null);
-            }
             return false;
         }
 
@@ -82,9 +80,9 @@ public class AwsS3InputStream extends 
AbstractMultipleInputStream {
             CleanupUtils.close(in, null);
         }
 
-        String bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
+        String fileName = filePaths.get(nextFileIndex);
         GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
-        GetObjectRequest getObjectRequest = 
getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
+        GetObjectRequest getObjectRequest = 
getObjectBuilder.bucket(bucket).key(fileName).build();
 
         // Have a reference to the S3 stream to ensure that if GZipInputStream 
causes an IOException because of reading
         // the header, then the S3 stream gets closed in the close method
@@ -100,9 +98,8 @@ public class AwsS3InputStream extends 
AbstractMultipleInputStream {
         }
 
         // Use gzip stream if needed
-        String filename = filePaths.get(nextFileIndex).toLowerCase();
-        if (filename.endsWith(".gz") || filename.endsWith(".gzip")) {
-            in = new GZIPInputStream(s3Client.getObject(getObjectRequest), 
bufferSize);
+        if (StringUtils.endsWithIgnoreCase(fileName, ".gz") || 
StringUtils.endsWithIgnoreCase(fileName, ".gzip")) {
+            in = new GZIPInputStream(in, bufferSize);
         }
 
         // Current file ready, point to the next file
@@ -136,6 +133,9 @@ public class AwsS3InputStream extends 
AbstractMultipleInputStream {
         if (in != null) {
             CleanupUtils.close(in, null);
         }
+        if (s3Client != null) {
+            CleanupUtils.close(s3Client, null);
+        }
     }
 
     @Override

Reply via email to