This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 6ddbbfa3c567c3af2132f0f4dcee5253011c6934
Author: Ali Alsuliman <[email protected]>
AuthorDate: Tue Mar 1 20:45:40 2022 +0300

    [NO ISSUE][EXT] Set Azure request timeout
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - add azure_request_timeout configurable property
    - default timeout to 120 seconds
    - catch exceptions from external input stream and
      wrap in a RuntimeDataException to avoid halt due
      to non-serializable exceptions from external sources
    
    Change-Id: Iebf988384b0bc5d6ae7688c65747227dbde062b1
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15483
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Ali Alsuliman <[email protected]>
---
 .../asterix/app/translator/QueryTranslator.java    | 16 ++++----
 .../api/cluster_state_1/cluster_state_1.1.regexadm |  1 +
 .../cluster_state_1_full.1.regexadm                |  1 +
 .../cluster_state_1_less.1.regexadm                |  1 +
 .../asterix/common/config/ExternalProperties.java  |  7 +++-
 .../reader/azure/blob/AzureBlobInputStream.java    | 11 ++++--
 .../azure/blob/AzureBlobInputStreamFactory.java    |  9 ++++-
 .../azure/datalake/AzureDataLakeInputStream.java   | 12 +++---
 .../datalake/AzureDataLakeInputStreamFactory.java  |  9 ++++-
 .../parquet/AzureBlobParquetReaderFactory.java     |  4 +-
 .../parquet/AzureDataLakeParquetReaderFactory.java |  5 ++-
 .../input/stream/AbstractMultipleInputStream.java  | 44 +++++++++++++---------
 .../asterix/external/util/ExternalDataUtils.java   | 30 +++++++++------
 13 files changed, 98 insertions(+), 52 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f7da31d..06458ce 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -65,6 +65,7 @@ import org.apache.asterix.app.result.fields.ErrorsPrinter;
 import org.apache.asterix.app.result.fields.ResultHandlePrinter;
 import org.apache.asterix.app.result.fields.ResultsPrinter;
 import org.apache.asterix.app.result.fields.StatusPrinter;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.api.IRequestTracker;
@@ -829,7 +830,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                             metadataProvider, mdTxnCtx);
                     ExternalDataUtils.normalize(properties);
                     ExternalDataUtils.validate(properties);
-                    validateExternalDatasetProperties(externalDetails, 
properties, dd.getSourceLocation(), mdTxnCtx);
+                    validateExternalDatasetProperties(externalDetails, 
properties, dd.getSourceLocation(), mdTxnCtx,
+                            appCtx);
                     datasetDetails = new 
ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
                             TransactionState.COMMIT);
                     break;
@@ -4773,13 +4775,13 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     }
 
     protected void validateExternalDatasetProperties(ExternalDetailsDecl 
externalDetails,
-            Map<String, String> properties, SourceLocation srcLoc, 
MetadataTransactionContext mdTxnCtx)
-            throws AlgebricksException, HyracksDataException {
+            Map<String, String> properties, SourceLocation srcLoc, 
MetadataTransactionContext mdTxnCtx,
+            IApplicationContext appCtx) throws AlgebricksException, 
HyracksDataException {
         // Validate adapter specific properties
         String adapter = externalDetails.getAdapter();
         Map<String, String> details = new HashMap<>(properties);
         details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter);
-        validateAdapterSpecificProperties(details, srcLoc);
+        validateAdapterSpecificProperties(details, srcLoc, appCtx);
     }
 
     /**
@@ -4787,9 +4789,9 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
      *
      * @param configuration external source properties
      */
-    protected void validateAdapterSpecificProperties(Map<String, String> 
configuration, SourceLocation srcLoc)
-            throws CompilationException {
-        ExternalDataUtils.validateAdapterSpecificProperties(configuration, 
srcLoc, warningCollector);
+    protected void validateAdapterSpecificProperties(Map<String, String> 
configuration, SourceLocation srcLoc,
+            IApplicationContext appCtx) throws CompilationException {
+        ExternalDataUtils.validateAdapterSpecificProperties(configuration, 
srcLoc, warningCollector, appCtx);
     }
 
     protected enum CreateResult {
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index bf736b4..1805e7a 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -8,6 +8,7 @@
     "active\.memory\.global\.budget" : 67108864,
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
+    "azure.request.timeout" : 120,
     "compiler\.arrayindex" : true,
     "compiler\.external\.field\.pushdown" : true,
     "compiler\.framesize" : 32768,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index d52cedd..743347a 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -8,6 +8,7 @@
     "active\.memory\.global\.budget" : 67108864,
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
+    "azure.request.timeout" : 120,
     "compiler\.arrayindex" : true,
     "compiler\.external\.field\.pushdown" : true,
     "compiler\.framesize" : 32768,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 4f5267f..4359bd9 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -8,6 +8,7 @@
     "active\.memory\.global\.budget" : 67108864,
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
+    "azure.request.timeout" : 120,
     "compiler\.arrayindex" : true,
     "compiler\.external\.field\.pushdown" : true,
     "compiler\.framesize" : 32768,
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index 46258bf..515aad6 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -51,7 +51,8 @@ public class ExternalProperties extends AbstractProperties {
                 StorageUtil.getIntSizeInBytes(200, 
StorageUtil.StorageUnit.MEGABYTE),
                 "The maximum accepted web request size in bytes"),
         REQUESTS_ARCHIVE_SIZE(NONNEGATIVE_INTEGER, 50, "The maximum number of 
archived requests to maintain"),
-        LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a 
UDF in seconds");
+        LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a 
UDF in seconds"),
+        AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client 
requests in seconds");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -78,6 +79,7 @@ public class ExternalProperties extends AbstractProperties {
                 case MAX_WAIT_ACTIVE_CLUSTER:
                 case MAX_WEB_REQUEST_SIZE:
                 case LIBRARY_DEPLOY_TIMEOUT:
+                case AZURE_REQUEST_TIMEOUT:
                     return Section.COMMON;
                 case CC_JAVA_OPTS:
                 case NC_JAVA_OPTS:
@@ -155,4 +157,7 @@ public class ExternalProperties extends AbstractProperties {
         return accessor.getInt(Option.LIBRARY_DEPLOY_TIMEOUT);
     }
 
+    public int getAzureRequestTimeout() {
+        return accessor.getInt(Option.AZURE_REQUEST_TIMEOUT);
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
index b402f25..cdb3834 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.zip.GZIPInputStream;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -45,9 +46,10 @@ public class AzureBlobInputStream extends 
AbstractExternalInputStream {
     private final BlobServiceClient client;
     private final String container;
 
-    public AzureBlobInputStream(Map<String, String> configuration, 
List<String> filePaths) throws HyracksDataException {
+    public AzureBlobInputStream(IApplicationContext appCtx, Map<String, 
String> configuration, List<String> filePaths)
+            throws HyracksDataException {
         super(configuration, filePaths);
-        this.client = buildAzureClient(configuration);
+        this.client = buildAzureClient(appCtx, configuration);
         this.container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
 
@@ -81,9 +83,10 @@ public class AzureBlobInputStream extends 
AbstractExternalInputStream {
         return true;
     }
 
-    private BlobServiceClient buildAzureClient(Map<String, String> 
configuration) throws HyracksDataException {
+    private BlobServiceClient buildAzureClient(IApplicationContext appCtx, 
Map<String, String> configuration)
+            throws HyracksDataException {
         try {
-            return ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
+            return ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, 
configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index bf904a4..064b319 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.external.api.AsterixInputStream;
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataUtils;
@@ -41,7 +42,10 @@ public class AzureBlobInputStreamFactory extends 
AbstractExternalInputStreamFact
 
     @Override
     public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int 
partition) throws HyracksDataException {
-        return new AzureBlobInputStream(configuration, 
partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+        IApplicationContext appCtx =
+                (IApplicationContext) 
ctx.getJobletContext().getServiceContext().getApplicationContext();
+        return new AzureBlobInputStream(appCtx, configuration,
+                partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
     }
 
     @Override
@@ -49,10 +53,11 @@ public class AzureBlobInputStreamFactory extends 
AbstractExternalInputStreamFact
             throws AlgebricksException {
         super.configure(ctx, configuration, warningCollector);
 
+        IApplicationContext appCtx = (IApplicationContext) 
ctx.getApplicationContext();
         // Ensure the validity of include/exclude
         ExternalDataUtils.validateIncludeExclude(configuration);
         IncludeExcludeMatcher includeExcludeMatcher = 
ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-        BlobServiceClient blobServiceClient = 
ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
+        BlobServiceClient blobServiceClient = 
ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
         List<BlobItem> filesOnly = 
ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
                 includeExcludeMatcher, warningCollector);
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
index b7d142f..e34d188 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.zip.GZIPInputStream;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -45,10 +46,10 @@ public class AzureDataLakeInputStream extends 
AbstractExternalInputStream {
     private final DataLakeServiceClient client;
     private final String container;
 
-    public AzureDataLakeInputStream(Map<String, String> configuration, 
List<String> filePaths)
-            throws HyracksDataException {
+    public AzureDataLakeInputStream(IApplicationContext appCtx, Map<String, 
String> configuration,
+            List<String> filePaths) throws HyracksDataException {
         super(configuration, filePaths);
-        this.client = buildAzureClient(configuration);
+        this.client = buildAzureClient(appCtx, configuration);
         this.container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
 
@@ -82,9 +83,10 @@ public class AzureDataLakeInputStream extends 
AbstractExternalInputStream {
         return true;
     }
 
-    private DataLakeServiceClient buildAzureClient(Map<String, String> 
configuration) throws HyracksDataException {
+    private DataLakeServiceClient buildAzureClient(IApplicationContext appCtx, 
Map<String, String> configuration)
+            throws HyracksDataException {
         try {
-            return 
ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
+            return ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, 
configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index e145e1f..e9f8d4c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.external.api.AsterixInputStream;
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataUtils;
@@ -41,7 +42,10 @@ public class AzureDataLakeInputStreamFactory extends 
AbstractExternalInputStream
 
     @Override
     public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int 
partition) throws HyracksDataException {
-        return new AzureDataLakeInputStream(configuration, 
partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+        IApplicationContext appCtx =
+                (IApplicationContext) 
ctx.getJobletContext().getServiceContext().getApplicationContext();
+        return new AzureDataLakeInputStream(appCtx, configuration,
+                partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
     }
 
     @Override
@@ -49,10 +53,11 @@ public class AzureDataLakeInputStreamFactory extends 
AbstractExternalInputStream
             throws AlgebricksException {
         super.configure(ctx, configuration, warningCollector);
 
+        IApplicationContext appCtx = (IApplicationContext) 
ctx.getApplicationContext();
         // Ensure the validity of include/exclude
         ExternalDataUtils.validateIncludeExclude(configuration);
         IncludeExcludeMatcher includeExcludeMatcher = 
ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-        DataLakeServiceClient client = 
ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
+        DataLakeServiceClient client = 
ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
         List<PathItem> filesOnly = 
ExternalDataUtils.Azure.listDatalakePathItems(client, configuration,
                 includeExcludeMatcher, warningCollector);
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index 1f82dae..c2251df 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
@@ -45,7 +46,8 @@ public class AzureBlobParquetReaderFactory extends 
HDFSDataSourceFactory {
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
             IWarningCollector warningCollector) throws AlgebricksException, 
HyracksDataException {
-        BlobServiceClient blobServiceClient = 
ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
+        IApplicationContext appCtx = (IApplicationContext) 
serviceCtx.getApplicationContext();
+        BlobServiceClient blobServiceClient = 
ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
         //Get endpoint
         String endPoint = extractEndPoint(blobServiceClient.getAccountUrl());
         //Get path
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index 8474a74..db87868 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
@@ -45,7 +46,9 @@ public class AzureDataLakeParquetReaderFactory extends 
HDFSDataSourceFactory {
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
             IWarningCollector warningCollector) throws AlgebricksException, 
HyracksDataException {
-        DataLakeServiceClient dataLakeServiceClient = 
ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
+        IApplicationContext appCtx = (IApplicationContext) 
serviceCtx.getApplicationContext();
+        DataLakeServiceClient dataLakeServiceClient =
+                ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, 
configuration);
 
         //Get endpoint
         String endPoint = 
extractEndPoint(dataLakeServiceClient.getAccountUrl());
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java
index 8f032d8..18ef150 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java
@@ -21,10 +21,13 @@ package org.apache.asterix.external.input.stream;
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IStreamNotificationHandler;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
 
 /**
  * Base class for a source stream that is composed of multiple separate input 
streams. Reading proceeds one stream at
@@ -54,25 +57,30 @@ public abstract class AbstractMultipleInputStream extends 
AsterixInputStream {
 
     @Override
     public final int read(byte[] b, int off, int len) throws IOException {
-        if (in == null) {
-            if (!advance()) {
-                return -1;
+        try {
+            if (in == null) {
+                if (!advance()) {
+                    return -1;
+                }
             }
+            int result = in.read(b, off, len);
+            if (result < 0 && (lastByte != ExternalDataConstants.BYTE_LF)
+                    && (lastByte != ExternalDataConstants.BYTE_CR)) {
+                // return a new line at the end of every file <--Might create 
problems for some cases
+                // depending on the parser implementation-->
+                lastByte = ExternalDataConstants.BYTE_LF;
+                b[off] = ExternalDataConstants.BYTE_LF;
+                return 1;
+            }
+            while ((result < 0) && advance()) {
+                result = in.read(b, off, len);
+            }
+            if (result > 0) {
+                lastByte = b[(off + result) - 1];
+            }
+            return result;
+        } catch (Exception e) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ExceptionUtils.getMessageOrToString(e));
         }
-        int result = in.read(b, off, len);
-        if (result < 0 && (lastByte != ExternalDataConstants.BYTE_LF) && 
(lastByte != ExternalDataConstants.BYTE_CR)) {
-            // return a new line at the end of every file <--Might create 
problems for some cases
-            // depending on the parser implementation-->
-            lastByte = ExternalDataConstants.BYTE_LF;
-            b[off] = ExternalDataConstants.BYTE_LF;
-            return 1;
-        }
-        while ((result < 0) && advance()) {
-            result = in.read(b, off, len);
-        }
-        if (result > 0) {
-            lastByte = b[(off + result) - 1];
-        }
-        return result;
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 22040e2..8e38eed 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -90,6 +90,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -139,6 +140,7 @@ import com.azure.storage.blob.BlobServiceClientBuilder;
 import com.azure.storage.blob.models.BlobItem;
 import com.azure.storage.blob.models.ListBlobsOptions;
 import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.common.policy.RequestRetryOptions;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
 import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
@@ -597,7 +599,7 @@ public class ExternalDataUtils {
      * @param configuration properties
      */
     public static void validateAdapterSpecificProperties(Map<String, String> 
configuration, SourceLocation srcLoc,
-            IWarningCollector collector) throws CompilationException {
+            IWarningCollector collector, IApplicationContext appCtx) throws 
CompilationException {
         String type = 
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
 
         switch (type) {
@@ -605,10 +607,10 @@ public class ExternalDataUtils {
                 AwsS3.validateProperties(configuration, srcLoc, collector);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
-                Azure.validateAzureBlobProperties(configuration, srcLoc, 
collector);
+                Azure.validateAzureBlobProperties(configuration, srcLoc, 
collector, appCtx);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE:
-                Azure.validateAzureDataLakeProperties(configuration, srcLoc, 
collector);
+                Azure.validateAzureDataLakeProperties(configuration, srcLoc, 
collector, appCtx);
                 break;
             case KEY_ADAPTER_NAME_GCS:
                 GCS.validateProperties(configuration, srcLoc, collector);
@@ -1277,8 +1279,8 @@ public class ExternalDataUtils {
          * @param configuration properties
          * @return client
          */
-        public static BlobServiceClient buildAzureBlobClient(Map<String, 
String> configuration)
-                throws CompilationException {
+        public static BlobServiceClient 
buildAzureBlobClient(IApplicationContext appCtx,
+                Map<String, String> configuration) throws CompilationException 
{
             String managedIdentityId = 
configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
             String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
             String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
@@ -1292,6 +1294,9 @@ public class ExternalDataUtils {
 
             // Client builder
             BlobServiceClientBuilder builder = new BlobServiceClientBuilder();
+            int timeout = 
appCtx.getExternalProperties().getAzureRequestTimeout();
+            RequestRetryOptions requestRetryOptions = new 
RequestRetryOptions(null, null, timeout, null, null, null);
+            builder.retryOptions(requestRetryOptions);
 
             // Endpoint is required
             if (endpoint == null) {
@@ -1428,8 +1433,8 @@ public class ExternalDataUtils {
          * @param configuration properties
          * @return client
          */
-        public static DataLakeServiceClient 
buildAzureDatalakeClient(Map<String, String> configuration)
-                throws CompilationException {
+        public static DataLakeServiceClient 
buildAzureDatalakeClient(IApplicationContext appCtx,
+                Map<String, String> configuration) throws CompilationException 
{
             String managedIdentityId = 
configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
             String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
             String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
@@ -1443,6 +1448,9 @@ public class ExternalDataUtils {
 
             // Client builder
             DataLakeServiceClientBuilder builder = new 
DataLakeServiceClientBuilder();
+            int timeout = 
appCtx.getExternalProperties().getAzureRequestTimeout();
+            RequestRetryOptions requestRetryOptions = new 
RequestRetryOptions(null, null, timeout, null, null, null);
+            builder.retryOptions(requestRetryOptions);
 
             // Endpoint is required
             if (endpoint == null) {
@@ -1702,7 +1710,7 @@ public class ExternalDataUtils {
          * @throws CompilationException Compilation exception
          */
         public static void validateAzureBlobProperties(Map<String, String> 
configuration, SourceLocation srcLoc,
-                IWarningCollector collector) throws CompilationException {
+                IWarningCollector collector, IApplicationContext appCtx) 
throws CompilationException {
 
             // check if the format property is present
             if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
@@ -1715,7 +1723,7 @@ public class ExternalDataUtils {
             BlobServiceClient blobServiceClient;
             try {
                 String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-                blobServiceClient = buildAzureBlobClient(configuration);
+                blobServiceClient = buildAzureBlobClient(appCtx, 
configuration);
                 BlobContainerClient blobContainer = 
blobServiceClient.getBlobContainerClient(container);
 
                 // Get all objects in a container and extract the paths to 
files
@@ -1741,7 +1749,7 @@ public class ExternalDataUtils {
          * @throws CompilationException Compilation exception
          */
         public static void validateAzureDataLakeProperties(Map<String, String> 
configuration, SourceLocation srcLoc,
-                IWarningCollector collector) throws CompilationException {
+                IWarningCollector collector, IApplicationContext appCtx) 
throws CompilationException {
 
             // check if the format property is present
             if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
@@ -1754,7 +1762,7 @@ public class ExternalDataUtils {
             DataLakeServiceClient dataLakeServiceClient;
             try {
                 String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-                dataLakeServiceClient = 
buildAzureDatalakeClient(configuration);
+                dataLakeServiceClient = buildAzureDatalakeClient(appCtx, 
configuration);
                 DataLakeFileSystemClient fileSystemClient = 
dataLakeServiceClient.getFileSystemClient(container);
 
                 // Get all objects in a container and extract the paths to 
files

Reply via email to