This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 6ddbbfa3c567c3af2132f0f4dcee5253011c6934 Author: Ali Alsuliman <[email protected]> AuthorDate: Tue Mar 1 20:45:40 2022 +0300 [NO ISSUE][EXT] Set Azure request timeout - user model changes: no - storage format changes: no - interface changes: no Details: - add azure_request_timeout configurable property - default timeout to 120 seconds - catch exceptions from external input stream and wrap in a RuntimeDataException to avoid halt due to non-serializable exceptions from external sources Change-Id: Iebf988384b0bc5d6ae7688c65747227dbde062b1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15483 Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Ali Alsuliman <[email protected]> --- .../asterix/app/translator/QueryTranslator.java | 16 ++++---- .../api/cluster_state_1/cluster_state_1.1.regexadm | 1 + .../cluster_state_1_full.1.regexadm | 1 + .../cluster_state_1_less.1.regexadm | 1 + .../asterix/common/config/ExternalProperties.java | 7 +++- .../reader/azure/blob/AzureBlobInputStream.java | 11 ++++-- .../azure/blob/AzureBlobInputStreamFactory.java | 9 ++++- .../azure/datalake/AzureDataLakeInputStream.java | 12 +++--- .../datalake/AzureDataLakeInputStreamFactory.java | 9 ++++- .../parquet/AzureBlobParquetReaderFactory.java | 4 +- .../parquet/AzureDataLakeParquetReaderFactory.java | 5 ++- .../input/stream/AbstractMultipleInputStream.java | 44 +++++++++++++--------- .../asterix/external/util/ExternalDataUtils.java | 30 +++++++++------ 13 files changed, 98 insertions(+), 52 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index f7da31d..06458ce 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -65,6 +65,7 @@ import org.apache.asterix.app.result.fields.ErrorsPrinter; import org.apache.asterix.app.result.fields.ResultHandlePrinter; import org.apache.asterix.app.result.fields.ResultsPrinter; import org.apache.asterix.app.result.fields.StatusPrinter; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.IClientRequest; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.api.IRequestTracker; @@ -829,7 +830,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen metadataProvider, mdTxnCtx); ExternalDataUtils.normalize(properties); ExternalDataUtils.validate(properties); - validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx); + validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx, + appCtx); datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(), TransactionState.COMMIT); break; @@ -4773,13 +4775,13 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails, - Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx) - throws AlgebricksException, HyracksDataException { + Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx, + IApplicationContext appCtx) throws AlgebricksException, HyracksDataException { // Validate adapter specific properties String adapter = externalDetails.getAdapter(); Map<String, String> details = new HashMap<>(properties); details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter); - validateAdapterSpecificProperties(details, srcLoc); + validateAdapterSpecificProperties(details, srcLoc, appCtx); } /** @@ -4787,9 +4789,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen * * @param configuration external source properties */ - protected void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc) - throws CompilationException { - ExternalDataUtils.validateAdapterSpecificProperties(configuration, srcLoc, warningCollector); + protected void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc, + IApplicationContext appCtx) throws CompilationException { + ExternalDataUtils.validateAdapterSpecificProperties(configuration, srcLoc, warningCollector, appCtx); } protected enum CreateResult { diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index bf736b4..1805e7a 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -8,6 +8,7 @@ "active\.memory\.global\.budget" : 67108864, "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, + "azure.request.timeout" : 120, "compiler\.arrayindex" : true, "compiler\.external\.field\.pushdown" : true, "compiler\.framesize" : 32768, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index d52cedd..743347a 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -8,6 +8,7 @@ "active\.memory\.global\.budget" : 67108864, "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, + "azure.request.timeout" : 120, "compiler\.arrayindex" : true, "compiler\.external\.field\.pushdown" : true, "compiler\.framesize" : 32768, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index 4f5267f..4359bd9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -8,6 +8,7 @@ "active\.memory\.global\.budget" : 67108864, "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, + "azure.request.timeout" : 120, "compiler\.arrayindex" : true, "compiler\.external\.field\.pushdown" : true, "compiler\.framesize" : 32768, diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java index 46258bf..515aad6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java @@ -51,7 +51,8 @@ public class ExternalProperties extends AbstractProperties { StorageUtil.getIntSizeInBytes(200, StorageUtil.StorageUnit.MEGABYTE), "The maximum accepted web request size in bytes"), REQUESTS_ARCHIVE_SIZE(NONNEGATIVE_INTEGER, 50, "The maximum number of archived requests to maintain"), - LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a UDF in seconds"); + LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a UDF in seconds"), + AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client requests in seconds"); private final IOptionType type; private final Object defaultValue; @@ -78,6 +79,7 @@ public class ExternalProperties extends AbstractProperties { case MAX_WAIT_ACTIVE_CLUSTER: case MAX_WEB_REQUEST_SIZE: case LIBRARY_DEPLOY_TIMEOUT: + case AZURE_REQUEST_TIMEOUT: return Section.COMMON; case CC_JAVA_OPTS: case NC_JAVA_OPTS: @@ -155,4 +157,7 @@ public class ExternalProperties extends AbstractProperties { return accessor.getInt(Option.LIBRARY_DEPLOY_TIMEOUT); } + public int getAzureRequestTimeout() { + return accessor.getInt(Option.AZURE_REQUEST_TIMEOUT); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java index b402f25..cdb3834 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.zip.GZIPInputStream; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; @@ -45,9 +46,10 @@ public class AzureBlobInputStream extends AbstractExternalInputStream { private final BlobServiceClient client; private final String container; - public AzureBlobInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException { + public AzureBlobInputStream(IApplicationContext appCtx, Map<String, String> configuration, List<String> filePaths) + throws HyracksDataException { super(configuration, filePaths); - this.client = buildAzureClient(configuration); + this.client = buildAzureClient(appCtx, configuration); this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); } @@ -81,9 +83,10 @@ public class AzureBlobInputStream extends AbstractExternalInputStream { return true; } - private BlobServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException { + private BlobServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration) + throws HyracksDataException { try { - return ExternalDataUtils.Azure.buildAzureBlobClient(configuration); + return ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration); } catch (CompilationException ex) { throw HyracksDataException.create(ex); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java index bf904a4..064b319 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory; import org.apache.asterix.external.util.ExternalDataUtils; @@ -41,7 +42,10 @@ public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFact @Override public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException { - return new AzureBlobInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths()); + IApplicationContext appCtx = + (IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); + return new AzureBlobInputStream(appCtx, configuration, + partitionWorkLoadsBasedOnSize.get(partition).getFilePaths()); } @Override @@ -49,10 +53,11 @@ public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFact throws AlgebricksException { super.configure(ctx, configuration, warningCollector); + IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext(); // Ensure the validity of include/exclude ExternalDataUtils.validateIncludeExclude(configuration); IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); - BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration); + BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration); List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java index b7d142f..e34d188 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.zip.GZIPInputStream; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; @@ -45,10 +46,10 @@ public class AzureDataLakeInputStream extends AbstractExternalInputStream { private final DataLakeServiceClient client; private final String container; - public AzureDataLakeInputStream(Map<String, String> configuration, List<String> filePaths) - throws HyracksDataException { + public AzureDataLakeInputStream(IApplicationContext appCtx, Map<String, String> configuration, + List<String> filePaths) throws HyracksDataException { super(configuration, filePaths); - this.client = buildAzureClient(configuration); + this.client = buildAzureClient(appCtx, configuration); this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); } @@ -82,9 +83,10 @@ public class AzureDataLakeInputStream extends AbstractExternalInputStream { return true; } - private DataLakeServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException { + private DataLakeServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration) + throws HyracksDataException { try { - return ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration); + return ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration); } catch (CompilationException ex) { throw HyracksDataException.create(ex); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java index e145e1f..e9f8d4c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory; import org.apache.asterix.external.util.ExternalDataUtils; @@ -41,7 +42,10 @@ public class AzureDataLakeInputStreamFactory extends AbstractExternalInputStream @Override public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException { - return new AzureDataLakeInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths()); + IApplicationContext appCtx = + (IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); + return new AzureDataLakeInputStream(appCtx, configuration, + partitionWorkLoadsBasedOnSize.get(partition).getFilePaths()); } @Override @@ -49,10 +53,11 @@ public class AzureDataLakeInputStreamFactory extends AbstractExternalInputStream throws AlgebricksException { super.configure(ctx, configuration, warningCollector); + IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext(); // Ensure the validity of include/exclude ExternalDataUtils.validateIncludeExclude(configuration); IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); - DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration); + DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration); List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(client, configuration, includeExcludeMatcher, warningCollector); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java index 1f82dae..c2251df 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.external.input.HDFSDataSourceFactory; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; @@ -45,7 +46,8 @@ public class AzureBlobParquetReaderFactory extends HDFSDataSourceFactory { @Override public void configure(IServiceContext serviceCtx, Map<String, String> configuration, IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException { - BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration); + IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext(); + BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration); //Get endpoint String endPoint = extractEndPoint(blobServiceClient.getAccountUrl()); //Get path diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java index 8474a74..db87868 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.external.input.HDFSDataSourceFactory; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; @@ -45,7 +46,9 @@ public class AzureDataLakeParquetReaderFactory extends HDFSDataSourceFactory { @Override public void configure(IServiceContext serviceCtx, Map<String, String> configuration, IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException { - DataLakeServiceClient dataLakeServiceClient = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration); + IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext(); + DataLakeServiceClient dataLakeServiceClient = + ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration); //Get endpoint String endPoint = extractEndPoint(dataLakeServiceClient.getAccountUrl()); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java index 8f032d8..18ef150 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java @@ -21,10 +21,13 @@ package org.apache.asterix.external.input.stream; import java.io.IOException; import java.io.InputStream; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IStreamNotificationHandler; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.ExceptionUtils; /** * Base class for a source stream that is composed of multiple separate input streams. Reading proceeds one stream at @@ -54,25 +57,30 @@ public abstract class AbstractMultipleInputStream extends AsterixInputStream { @Override public final int read(byte[] b, int off, int len) throws IOException { - if (in == null) { - if (!advance()) { - return -1; + try { + if (in == null) { + if (!advance()) { + return -1; + } } + int result = in.read(b, off, len); + if (result < 0 && (lastByte != ExternalDataConstants.BYTE_LF) + && (lastByte != ExternalDataConstants.BYTE_CR)) { + // return a new line at the end of every file <--Might create problems for some cases + // depending on the parser implementation--> + lastByte = ExternalDataConstants.BYTE_LF; + b[off] = ExternalDataConstants.BYTE_LF; + return 1; + } + while ((result < 0) && advance()) { + result = in.read(b, off, len); + } + if (result > 0) { + lastByte = b[(off + result) - 1]; + } + return result; + } catch (Exception e) { + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ExceptionUtils.getMessageOrToString(e)); } - int result = in.read(b, off, len); - if (result < 0 && (lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_CR)) { - // return a new line at the end of every file <--Might create problems for some cases - // depending on the parser implementation--> - lastByte = ExternalDataConstants.BYTE_LF; - b[off] = ExternalDataConstants.BYTE_LF; - return 1; - } - while ((result < 0) && advance()) { - result = in.read(b, off, len); - } - if (result > 0) { - lastByte = b[(off + result) - 1]; - } - return result; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 22040e2..8e38eed 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -90,6 +90,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; @@ -139,6 +140,7 @@ import com.azure.storage.blob.BlobServiceClientBuilder; import com.azure.storage.blob.models.BlobItem; import com.azure.storage.blob.models.ListBlobsOptions; import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.common.policy.RequestRetryOptions; import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; @@ -597,7 +599,7 @@ public class ExternalDataUtils { * @param configuration properties */ public static void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc, - IWarningCollector collector) throws CompilationException { + IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE); switch (type) { @@ -605,10 +607,10 @@ public class ExternalDataUtils { AwsS3.validateProperties(configuration, srcLoc, collector); break; case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB: - Azure.validateAzureBlobProperties(configuration, srcLoc, collector); + Azure.validateAzureBlobProperties(configuration, srcLoc, collector, appCtx); break; case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE: - Azure.validateAzureDataLakeProperties(configuration, srcLoc, collector); + Azure.validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx); break; case KEY_ADAPTER_NAME_GCS: GCS.validateProperties(configuration, srcLoc, collector); @@ -1277,8 +1279,8 @@ public class ExternalDataUtils { * @param configuration properties * @return client */ - public static BlobServiceClient buildAzureBlobClient(Map<String, String> configuration) - throws CompilationException { + public static BlobServiceClient buildAzureBlobClient(IApplicationContext appCtx, + Map<String, String> configuration) throws CompilationException { String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME); String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME); String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME); @@ -1292,6 +1294,9 @@ public class ExternalDataUtils { // Client builder BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + int timeout = appCtx.getExternalProperties().getAzureRequestTimeout(); + RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null); + builder.retryOptions(requestRetryOptions); // Endpoint is required if (endpoint == null) { @@ -1428,8 +1433,8 @@ public class ExternalDataUtils { * @param configuration properties * @return client */ - public static DataLakeServiceClient buildAzureDatalakeClient(Map<String, String> configuration) - throws CompilationException { + public static DataLakeServiceClient buildAzureDatalakeClient(IApplicationContext appCtx, + Map<String, String> configuration) throws CompilationException { String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME); String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME); String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME); @@ -1443,6 +1448,9 @@ public class ExternalDataUtils { // Client builder DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder(); + int timeout = appCtx.getExternalProperties().getAzureRequestTimeout(); + RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null); + builder.retryOptions(requestRetryOptions); // Endpoint is required if (endpoint == null) { @@ -1702,7 +1710,7 @@ public class ExternalDataUtils { * @throws CompilationException Compilation exception */ public static void validateAzureBlobProperties(Map<String, String> configuration, SourceLocation srcLoc, - IWarningCollector collector) throws CompilationException { + IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { // check if the format property is present if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { @@ -1715,7 +1723,7 @@ public class ExternalDataUtils { BlobServiceClient blobServiceClient; try { String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - blobServiceClient = buildAzureBlobClient(configuration); + blobServiceClient = buildAzureBlobClient(appCtx, configuration); BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container); // Get all objects in a container and extract the paths to files @@ -1741,7 +1749,7 @@ public class ExternalDataUtils { * @throws CompilationException Compilation exception */ public static void validateAzureDataLakeProperties(Map<String, String> configuration, SourceLocation srcLoc, - IWarningCollector collector) throws CompilationException { + IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { // check if the format property is present if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { @@ -1754,7 +1762,7 @@ public class ExternalDataUtils { DataLakeServiceClient dataLakeServiceClient; try { String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - dataLakeServiceClient = buildAzureDatalakeClient(configuration); + dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration); DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(container); // Get all objects in a container and extract the paths to files
