This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 27792084826c46d5980c0aafa43affc580e2c576 Author: Hussain Towaileb <[email protected]> AuthorDate: Tue Mar 30 23:56:35 2021 +0300 [NO ISSUE][EXT] Avoid duplicate open for streams + minor refactoring Change-Id: I405e84a30ee67b176c3389db6fd026c408ae1685 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10783 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../abstracts/AbstractExternalInputStreamFactory.java | 13 +++++++++++-- .../input/record/reader/aws/AwsS3InputStream.java | 11 ++++++----- .../input/record/reader/aws/AwsS3InputStreamFactory.java | 15 ++++----------- .../input/record/reader/azure/AzureBlobInputStream.java | 12 ++++++------ .../record/reader/azure/AzureBlobInputStreamFactory.java | 15 ++++----------- .../asterix/external/util/ExternalDataConstants.java | 4 +--- .../apache/asterix/external/util/ExternalDataUtils.java | 4 ++-- 7 files changed, 34 insertions(+), 40 deletions(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java index ca55b6f..0b215a0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java @@ -30,6 +30,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.external.api.AsterixInputStream; @@ -69,9 +70,17 @@ public abstract class AbstractExternalInputStreamFactory implements IInputStream return partitionConstraint; } + protected int getPartitionsCount() { + return getPartitionConstraint().getLocations().length; + } + @Override - public abstract void configure(IServiceContext ctx, Map<String, String> configuration, - IWarningCollector warningCollector) throws AlgebricksException; + public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector) + throws AlgebricksException { + this.configuration = configuration; + this.partitionConstraint = + ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager().getClusterLocations(); + } /** * Finds the smallest workload and returns it diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java index e3e53d5..48035f3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@ -42,15 +42,17 @@ import software.amazon.awssdk.services.s3.model.NoSuchKeyException; public class AwsS3InputStream extends AbstractExternalInputStream { private final S3Client s3Client; + private final String bucket; public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException { super(configuration, filePaths); this.s3Client = buildAwsS3Client(configuration); + this.bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); } @Override protected boolean getInputStream() throws IOException { - String bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME); + String fileName = filePaths.get(nextFileIndex); GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder(); GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build(); @@ -67,11 +69,10 @@ public class AwsS3InputStream extends AbstractExternalInputStream { } // Use gzip stream if needed - String filename = filePaths.get(nextFileIndex).toLowerCase(); - if (filename.endsWith(".gz") || filename.endsWith(".gzip")) { - in = new GZIPInputStream(s3Client.getObject(getObjectRequest), ExternalDataConstants.DEFAULT_BUFFER_SIZE); + String lowerCaseFileName = fileName.toLowerCase(); + if (lowerCaseFileName.endsWith(".gz") || lowerCaseFileName.endsWith(".gzip")) { + in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE); } - return true; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java index 5bab888..a1c577a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java @@ -18,19 +18,17 @@ */ package org.apache.asterix.external.input.record.reader.aws; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3; - import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.BiPredicate; import java.util.regex.Matcher; -import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; @@ -61,8 +59,7 @@ public class AwsS3InputStreamFactory extends AbstractExternalInputStreamFactory @Override public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector) throws AlgebricksException { - this.configuration = configuration; - ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext(); + super.configure(ctx, configuration, warningCollector); // Ensure the validity of include/exclude ExternalDataUtils.validateIncludeExclude(configuration); @@ -70,7 +67,7 @@ public class AwsS3InputStreamFactory extends AbstractExternalInputStreamFactory // Prepare to retrieve the objects List<S3Object> filesOnly; - String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME); + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration); try { @@ -101,12 +98,8 @@ public class AwsS3InputStreamFactory extends AbstractExternalInputStreamFactory warningCollector.warn(warning); } - // Partition constraints - partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations(); - int partitionsCount = partitionConstraint.getLocations().length; - // Distribute work load amongst the partitions - distributeWorkLoad(filesOnly, partitionsCount); + distributeWorkLoad(filesOnly, getPartitionsCount()); } /** diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java index 358c412..3fb3395 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.external.input.record.reader.azure; -import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob; - import java.io.IOException; import java.util.List; import java.util.Map; @@ -43,15 +41,17 @@ import com.azure.storage.blob.models.BlobStorageException; public class AzureBlobInputStream extends AbstractExternalInputStream { private final BlobServiceClient client; + private final String container; public AzureBlobInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException { super(configuration, filePaths); this.client = buildAzureClient(configuration); + this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); } @Override protected boolean getInputStream() throws IOException { - String container = configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME); + String fileName = filePaths.get(nextFileIndex); BlobContainerClient blobContainerClient; BlobClient blob; try { @@ -60,9 +60,9 @@ public class AzureBlobInputStream extends AbstractExternalInputStream { in = blob.openInputStream(); // Use gzip stream if needed - String filename = filePaths.get(nextFileIndex).toLowerCase(); - if (filename.endsWith(".gz") || filename.endsWith(".gzip")) { - in = new GZIPInputStream(in = blob.openInputStream(), ExternalDataConstants.DEFAULT_BUFFER_SIZE); + String lowerCaseFileName = fileName.toLowerCase(); + if (lowerCaseFileName.endsWith(".gz") || lowerCaseFileName.endsWith(".gzip")) { + in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE); } } catch (BlobStorageException ex) { if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java index 167e22a..ca064b1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java @@ -18,19 +18,17 @@ */ package org.apache.asterix.external.input.record.reader.azure; -import static org.apache.asterix.external.util.ExternalDataConstants.*; - import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.BiPredicate; import java.util.regex.Matcher; -import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; @@ -56,10 +54,9 @@ public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFact @Override public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector) throws AlgebricksException { - this.configuration = configuration; - ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext(); + super.configure(ctx, configuration, warningCollector); - String container = configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME); + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); List<BlobItem> filesOnly = new ArrayList<>(); @@ -87,12 +84,8 @@ public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFact warningCollector.warn(warning); } - // Partition constraints - partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations(); - int partitionsCount = partitionConstraint.getLocations().length; - // Distribute work load amongst the partitions - distributeWorkLoad(filesOnly, partitionsCount); + distributeWorkLoad(filesOnly, getPartitionsCount()); } catch (Exception ex) { throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index fd5b269..45d15df 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -288,6 +288,7 @@ public class ExternalDataConstants { public static final String INVALID_VAL = "invalid value"; public static final String DEFINITION_FIELD_NAME = "definition"; + public static final String CONTAINER_NAME_FIELD_NAME = "container"; public static class AwsS3 { private AwsS3() { @@ -298,7 +299,6 @@ public class ExternalDataConstants { public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId"; public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey"; public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken"; - public static final String CONTAINER_NAME_FIELD_NAME = "container"; public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint"; } @@ -307,8 +307,6 @@ public class ExternalDataConstants { throw new AssertionError("do not instantiate"); } - public static final String CONTAINER_NAME_FIELD_NAME = "container"; - public static final String DEFINITION_FIELD_NAME = "definition"; public static final String CONNECTION_STRING_FIELD_NAME = "connectionString"; public static final String ACCOUNT_NAME_FIELD_NAME = "accountName"; public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 3ac1116..d1bbe89 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -776,7 +776,7 @@ public class ExternalDataUtils { S3Client s3Client = buildAwsS3Client(configuration);; S3Response response; boolean useOldApi = false; - String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME); + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); String prefix = getPrefix(configuration); try { @@ -943,7 +943,7 @@ public class ExternalDataUtils { // Check if the bucket is present BlobServiceClient blobServiceClient; try { - String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME); + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); blobServiceClient = buildAzureClient(configuration); BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);
