This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit b9d29911b46f2c17420a7d6438a9946133f89501
Author: Hussain Towaileb <[email protected]>
AuthorDate: Sun Oct 5 15:23:01 2025 +0300

    [NO ISSUE]: Refactor AWS common properties
    
    In an anticipation of incoming other AWS clients,
    factor out the common AWS properties to a different
    class outside of S3-specific classes.
    
    Ext-ref: MB-68835
    Change-Id: I4d2a50d361ad949d4d3aeee3208bbd8dbb3e83a2
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20463
    Tested-by: Hussain Towaileb <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
    Reviewed-by: Hussain Towaileb <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
---
 .../external/ExternalCredentialsCacheUpdater.java  |   4 +-
 .../org/apache/asterix/utils/RedactionUtil.java    |   2 +-
 .../cloud/clients/aws/s3/S3ClientConfig.java       |   4 +-
 .../cloud/writer/S3ExternalFileWriterFactory.java  |   4 +-
 .../input/record/reader/aws/AwsS3InputStream.java  |   9 +-
 .../reader/aws/delta/AwsS3DeltaReaderFactory.java  |   2 +-
 .../aws/parquet/AwsS3ParquetReaderFactory.java     |   2 +-
 .../asterix/external/util/ExternalDataUtils.java   |  11 +-
 .../asterix/external/util/aws/AwsConstants.java    |  42 ++
 .../apache/asterix/external/util/aws/AwsUtils.java | 286 ++++++++++
 .../asterix/external/util/aws/s3/S3AuthUtils.java  | 577 ---------------------
 .../asterix/external/util/aws/s3/S3Constants.java  |  16 -
 .../asterix/external/util/aws/s3/S3Utils.java      | 317 ++++++++++-
 .../input/record/reader/awss3/AwsS3Test.java       |   4 +-
 14 files changed, 657 insertions(+), 623 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
index 10f536bb71..a9541260c9 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -41,7 +41,7 @@ import 
org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
+import org.apache.asterix.external.util.aws.AwsUtils;
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.messaging.NCMessageBroker;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -94,7 +94,7 @@ public class ExternalCredentialsCacheUpdater implements 
IExternalCredentialsCach
             validateClusterState();
             try {
                 LOGGER.info("attempting to update AWS credentials for {}", 
key);
-                AwsCredentialsProvider newCredentials = 
S3AuthUtils.assumeRoleAndGetCredentials(configuration);
+                AwsCredentialsProvider newCredentials = 
AwsUtils.assumeRoleAndGetCredentials(configuration);
                 LOGGER.info("updated AWS credentials successfully for {}", 
key);
                 credentials = (AwsSessionCredentials) 
newCredentials.resolveCredentials();
                 appCtx.getExternalCredentialsCache().put(key, credentials);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
index d8b47b88b5..8ef2882fcb 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
@@ -20,7 +20,7 @@ package org.apache.asterix.utils;
 
 import static java.util.regex.Pattern.CASE_INSENSITIVE;
 import static java.util.regex.Pattern.DOTALL;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.SECRET_ACCESS_KEY_FIELD_NAME;
 
 import java.util.regex.Pattern;
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 025af0809f..f05a0e5820 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -22,7 +22,7 @@ import java.util.Map;
 import java.util.Objects;
 
 import org.apache.asterix.common.config.CloudProperties;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.external.util.aws.AwsConstants;
 
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -90,7 +90,7 @@ public final class S3ClientConfig {
 
     public static S3ClientConfig of(Map<String, String> configuration, int 
writeBufferSize) {
         // Used to determine local vs. actual S3
-        String endPoint = 
configuration.getOrDefault(S3Constants.SERVICE_END_POINT_FIELD_NAME, "");
+        String endPoint = 
configuration.getOrDefault(AwsConstants.SERVICE_END_POINT_FIELD_NAME, "");
         // Disabled
         long profilerLogInterval = 0;
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index b3747468cf..2d618149c1 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -27,8 +27,8 @@ import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
@@ -67,7 +67,7 @@ public final class S3ExternalFileWriterFactory extends 
AbstractCloudExternalFile
     @Override
     ICloudClient createCloudClient(IApplicationContext appCtx) throws 
CompilationException {
         S3ClientConfig config = S3ClientConfig.of(configuration, 
writeBufferSize);
-        return new S3CloudClient(config, S3AuthUtils.buildAwsS3Client(appCtx, 
configuration),
+        return new S3CloudClient(config, S3Utils.buildClient(appCtx, 
configuration),
                 ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 60205ece05..bea096671c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -35,7 +35,8 @@ import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmb
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import 
org.apache.asterix.external.input.record.reader.stream.AvailableInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
+import org.apache.asterix.external.util.aws.AwsUtils;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
@@ -96,7 +97,7 @@ public class AwsS3InputStream extends 
AbstractExternalInputStream {
                 LOGGER.debug(() -> "Key " + userData(request.key()) + " was 
not found in bucket {}" + request.bucket());
                 return false;
             } catch (S3Exception ex) {
-                if (S3AuthUtils.isArnAssumedRoleExpiredToken(configuration, 
ex.awsErrorDetails().errorCode())) {
+                if (AwsUtils.isArnAssumedRoleExpiredToken(configuration, 
ex.awsErrorDetails().errorCode())) {
                     LOGGER.debug(() -> "Expired AWS assume role session, will 
attempt to refresh the session");
                     rebuildAwsS3Client(configuration);
                     LOGGER.debug(() -> "Successfully refreshed AWS assume role 
session");
@@ -120,7 +121,7 @@ public class AwsS3InputStream extends 
AbstractExternalInputStream {
     }
 
     private boolean shouldRetry(String errorCode, int currentRetry) {
-        return currentRetry < MAX_RETRIES && 
S3AuthUtils.isRetryableError(errorCode);
+        return currentRetry < MAX_RETRIES && 
S3Utils.isRetryableError(errorCode);
     }
 
     @Override
@@ -148,7 +149,7 @@ public class AwsS3InputStream extends 
AbstractExternalInputStream {
 
     private S3Client buildAwsS3Client(Map<String, String> configuration) 
throws HyracksDataException {
         try {
-            return S3AuthUtils.buildAwsS3Client(ncAppCtx, configuration);
+            return S3Utils.buildClient(ncAppCtx, configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 7d678a9e46..51373a1d94 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
-import static 
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
+import static 
org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf;
 
 import java.util.Collections;
 import java.util.List;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index f5dc7a2eb6..833cba541e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.external.input.record.reader.aws.parquet;
 
-import static 
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
+import static 
org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf;
 import static org.apache.asterix.external.util.aws.s3.S3Utils.listS3Objects;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 9926e09e4c..ea7b9c5a68 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -32,7 +32,7 @@ import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PATH;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
-import static 
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
+import static 
org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf;
 import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
 import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
 import static 
org.apache.asterix.external.util.google.gcs.GCSAuthUtils.configureHdfsJobConf;
@@ -81,7 +81,7 @@ import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna
 import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
-import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
+import org.apache.asterix.external.util.aws.AwsConstants;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.external.util.azure.blob_storage.AzureConstants;
@@ -603,8 +603,9 @@ public class ExternalDataUtils {
         // If the table is in S3
         if 
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3))
 {
 
-            conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-            conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+            conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(AwsConstants.ACCESS_KEY_ID_FIELD_NAME));
+            conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
+                    
configuration.get(AwsConstants.SECRET_ACCESS_KEY_FIELD_NAME));
             tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
                     + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
                     + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
@@ -728,7 +729,7 @@ public class ExternalDataUtils {
 
         switch (type) {
             case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
-                S3AuthUtils.validateProperties(appCtx, configuration, srcLoc, 
collector);
+                S3Utils.validateProperties(appCtx, configuration, srcLoc, 
collector);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
                 validateAzureBlobProperties(configuration, srcLoc, collector, 
appCtx);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsConstants.java
new file mode 100644
index 0000000000..7868bdf2ca
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsConstants.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws;
+
+public class AwsConstants {
+    private AwsConstants() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    // Authentication specific parameters
+    public static final String REGION_FIELD_NAME = "region";
+    public static final String CROSS_REGION_FIELD_NAME = "crossRegion";
+    public static final String INSTANCE_PROFILE_FIELD_NAME = "instanceProfile";
+    public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
+    public static final String SECRET_ACCESS_KEY_FIELD_NAME = 
"secretAccessKey";
+    public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
+    public static final String ROLE_ARN_FIELD_NAME = "roleArn";
+    public static final String EXTERNAL_ID_FIELD_NAME = "externalId";
+    public static final String SERVICE_END_POINT_FIELD_NAME = 
"serviceEndpoint";
+
+    // AWS specific error codes
+    public static final String ERROR_INTERNAL_ERROR = "InternalError";
+    public static final String ERROR_SLOW_DOWN = "SlowDown";
+    public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
+    public static final String ERROR_EXPIRED_TOKEN = "ExpiredToken";
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
new file mode 100644
index 0000000000..a648853a46
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws;
+
+import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
+import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static 
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
+import static 
org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.ACCESS_KEY_ID_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.CROSS_REGION_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.ERROR_EXPIRED_TOKEN;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.EXTERNAL_ID_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.INSTANCE_PROFILE_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.ROLE_ARN_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.SECRET_ACCESS_KEY_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.SESSION_TOKEN_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import 
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+public class AwsUtils {
+
+    public enum AuthenticationType {
+        ANONYMOUS,
+        ARN_ASSUME_ROLE,
+        INSTANCE_PROFILE,
+        ACCESS_KEYS,
+        BAD_AUTHENTICATION
+    }
+
+    private AwsUtils() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    public static boolean isArnAssumedRoleExpiredToken(Map<String, String> 
configuration, String errorCode) {
+        return ERROR_EXPIRED_TOKEN.equals(errorCode)
+                && getAuthenticationType(configuration) == 
AuthenticationType.ARN_ASSUME_ROLE;
+    }
+
+    public static AwsCredentialsProvider 
buildCredentialsProvider(IApplicationContext appCtx,
+            Map<String, String> configuration) throws CompilationException {
+        AuthenticationType authenticationType = 
getAuthenticationType(configuration);
+        switch (authenticationType) {
+            case ANONYMOUS:
+                return AnonymousCredentialsProvider.create();
+            case ARN_ASSUME_ROLE:
+                return getTrustAccountCredentials(appCtx, configuration);
+            case INSTANCE_PROFILE:
+                return getInstanceProfileCredentials(configuration);
+            case ACCESS_KEYS:
+                return getAccessKeyCredentials(configuration);
+            default:
+                // missing required creds, report correct error message
+                String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+                if (externalId != null) {
+                    throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
+                            EXTERNAL_ID_FIELD_NAME);
+                } else {
+                    throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
+                            SESSION_TOKEN_FIELD_NAME);
+                }
+        }
+    }
+
+    public static Region validateAndGetRegion(String regionId) throws 
CompilationException {
+        List<Region> regions = S3Client.serviceMetadata().regions();
+        Optional<Region> selectedRegion = regions.stream().filter(region -> 
region.id().equals(regionId)).findFirst();
+
+        if (selectedRegion.isEmpty()) {
+            throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
+        }
+        return selectedRegion.get();
+    }
+
+    public static AuthenticationType getAuthenticationType(Map<String, String> 
configuration) {
+        String roleArn = configuration.get(ROLE_ARN_FIELD_NAME);
+        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+
+        if (noAuth(configuration)) {
+            return AuthenticationType.ANONYMOUS;
+        } else if (roleArn != null) {
+            return AuthenticationType.ARN_ASSUME_ROLE;
+        } else if (instanceProfile != null) {
+            return AuthenticationType.INSTANCE_PROFILE;
+        } else if (accessKeyId != null || secretAccessKey != null) {
+            return AuthenticationType.ACCESS_KEYS;
+        } else {
+            return AuthenticationType.BAD_AUTHENTICATION;
+        }
+    }
+
+    public static boolean validateAndGetCrossRegion(String crossRegion) throws 
CompilationException {
+        if (crossRegion == null) {
+            return false;
+        }
+        if (!"true".equalsIgnoreCase(crossRegion) && 
!"false".equalsIgnoreCase(crossRegion)) {
+            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
CROSS_REGION_FIELD_NAME, "true, false");
+        }
+        return Boolean.parseBoolean(crossRegion);
+    }
+
+    private static boolean noAuth(Map<String, String> configuration) {
+        return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, 
ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
+                ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, 
SESSION_TOKEN_FIELD_NAME) == null;
+    }
+
+    /**
+     * Returns the cached credentials if valid, otherwise, generates new 
credentials
+     *
+     * @param appCtx application context
+     * @param configuration configuration
+     * @return returns the cached credentials if valid, otherwise, generates 
new credentials
+     * @throws CompilationException CompilationException
+     */
+    public static AwsCredentialsProvider 
getTrustAccountCredentials(IApplicationContext appCtx,
+            Map<String, String> configuration) throws CompilationException {
+        IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
+        Object credentialsObject = 
cache.get(configuration.get(ExternalDataConstants.KEY_ENTITY_ID));
+        if (credentialsObject != null) {
+            return () -> (AwsSessionCredentials) credentialsObject;
+        }
+        IExternalCredentialsCacheUpdater cacheUpdater = 
appCtx.getExternalCredentialsCacheUpdater();
+        AwsSessionCredentials credentials;
+        try {
+            credentials = (AwsSessionCredentials) 
cacheUpdater.generateAndCacheCredentials(configuration);
+        } catch (HyracksDataException ex) {
+            throw new 
CompilationException(ErrorCode.FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION, 
ex, ex.getMessage());
+        }
+
+        return () -> credentials;
+    }
+
+    /**
+     * Assume role using provided credentials and return the new credentials
+     *
+     * @param configuration configuration
+     * @return return credentials from the assume role
+     * @throws CompilationException CompilationException
+     */
+    public static AwsCredentialsProvider 
assumeRoleAndGetCredentials(Map<String, String> configuration)
+            throws CompilationException {
+        String regionId = configuration.get(REGION_FIELD_NAME);
+        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
+        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+        Region region = validateAndGetRegion(regionId);
+
+        AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
+        builder.roleArn(arnRole);
+        builder.roleSessionName(UUID.randomUUID().toString());
+        builder.durationSeconds(900); // TODO(htowaileb): configurable? Can be 
900 to 43200 (15 mins to 12 hours)
+        if (externalId != null) {
+            builder.externalId(externalId);
+        }
+        AssumeRoleRequest request = builder.build();
+        AwsCredentialsProvider credentialsProvider = 
getCredentialsToAssumeRole(configuration);
+
+        // assume the role from the provided arn
+        try (StsClient stsClient =
+                
StsClient.builder().region(region).credentialsProvider(credentialsProvider).build())
 {
+            AssumeRoleResponse response = stsClient.assumeRole(request);
+            Credentials credentials = response.credentials();
+            return 
StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
+                    credentials.secretAccessKey(), 
credentials.sessionToken()));
+        } catch (SdkException ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
+        }
+    }
+
+    private static AwsCredentialsProvider 
getCredentialsToAssumeRole(Map<String, String> configuration)
+            throws CompilationException {
+        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        if (instanceProfile != null) {
+            return getInstanceProfileCredentials(configuration);
+        } else if (accessKeyId != null || secretAccessKey != null) {
+            return getAccessKeyCredentials(configuration);
+        } else {
+            throw new 
CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
+        }
+    }
+
+    private static AwsCredentialsProvider 
getInstanceProfileCredentials(Map<String, String> configuration)
+            throws CompilationException {
+        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+
+        // only "true" value is allowed
+        if (!"true".equalsIgnoreCase(instanceProfile)) {
+            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
INSTANCE_PROFILE_FIELD_NAME, "true");
+        }
+
+        String notAllowed = getNonNull(configuration, 
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
+                SESSION_TOKEN_FIELD_NAME);
+        if (notAllowed != null) {
+            throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+                    INSTANCE_PROFILE_FIELD_NAME);
+        }
+        return InstanceProfileCredentialsProvider.create();
+    }
+
+    private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, 
String> configuration)
+            throws CompilationException {
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+
+        if (accessKeyId == null) {
+            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
+                    SECRET_ACCESS_KEY_FIELD_NAME);
+        }
+        if (secretAccessKey == null) {
+            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
SECRET_ACCESS_KEY_FIELD_NAME,
+                    ACCESS_KEY_ID_FIELD_NAME);
+        }
+
+        // use session token if provided
+        if (sessionToken != null) {
+            return StaticCredentialsProvider
+                    .create(AwsSessionCredentials.create(accessKeyId, 
secretAccessKey, sessionToken));
+        } else {
+            return 
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, 
secretAccessKey));
+        }
+    }
+
+    private static String getNonNull(Map<String, String> configuration, 
String... fieldNames) {
+        for (String fieldName : fieldNames) {
+            if (configuration.get(fieldName) != null) {
+                return fieldName;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Generates a random external ID to be used in cross-account role 
assumption.
+     *
+     * @return external id
+     */
+    public static String generateExternalId() {
+        return UUID.randomUUID().toString();
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
deleted file mode 100644
index 90bb04f309..0000000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ /dev/null
@@ -1,577 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util.aws.s3;
-
-import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
-import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
-import static 
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
-import static 
org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
-import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
-import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
-import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
-import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
-import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.CROSS_REGION_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_EXPIRED_TOKEN;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.EXTERNAL_ID_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUMED_ROLE;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_ARN;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_EXTERNAL_ID;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INSTANCE_PROFILE;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_REGION;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SIMPLE;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMPORARY;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.INSTANCE_PROFILE_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.PATH_STYLE_ADDRESSING_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.REGION_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ROLE_ARN_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SESSION_TOKEN_FIELD_NAME;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
-
-import org.apache.asterix.common.api.IApplicationContext;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.external.IExternalCredentialsCache;
-import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataPrefix;
-import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.api.util.CleanupUtils;
-
-import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import 
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
-import software.amazon.awssdk.services.s3.model.S3Exception;
-import software.amazon.awssdk.services.s3.model.S3Response;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
-import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
-import software.amazon.awssdk.services.sts.model.Credentials;
-
-public class S3AuthUtils {
-    enum AuthenticationType {
-        ANONYMOUS,
-        ARN_ASSUME_ROLE,
-        INSTANCE_PROFILE,
-        ACCESS_KEYS,
-        BAD_AUTHENTICATION
-    }
-
-    private S3AuthUtils() {
-        throw new AssertionError("do not instantiate");
-    }
-
-    public static boolean isRetryableError(String errorCode) {
-        return errorCode.equals(ERROR_INTERNAL_ERROR) || 
errorCode.equals(ERROR_SLOW_DOWN);
-    }
-
-    public static boolean isArnAssumedRoleExpiredToken(Map<String, String> 
configuration, String errorCode) {
-        return ERROR_EXPIRED_TOKEN.equals(errorCode)
-                && getAuthenticationType(configuration) == 
AuthenticationType.ARN_ASSUME_ROLE;
-    }
-
-    /**
-     * Builds the S3 client using the provided configuration
-     *
-     * @param configuration properties
-     * @return S3 client
-     * @throws CompilationException CompilationException
-     */
-    public static S3Client buildAwsS3Client(IApplicationContext appCtx, 
Map<String, String> configuration)
-            throws CompilationException {
-        String regionId = configuration.get(REGION_FIELD_NAME);
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
-        Region region = validateAndGetRegion(regionId);
-        boolean crossRegion = 
validateAndGetCrossRegion(configuration.get(CROSS_REGION_FIELD_NAME));
-        AwsCredentialsProvider credentialsProvider = 
buildCredentialsProvider(appCtx, configuration);
-
-        S3ClientBuilder builder = S3Client.builder();
-        builder.region(region);
-        builder.crossRegionAccessEnabled(crossRegion);
-        builder.credentialsProvider(credentialsProvider);
-
-        // Validate the service endpoint if present
-        if (serviceEndpoint != null) {
-            try {
-                URI uri = new URI(serviceEndpoint);
-                try {
-                    builder.endpointOverride(uri);
-                } catch (NullPointerException ex) {
-                    throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
-                }
-            } catch (URISyntaxException ex) {
-                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
-                        String.format("Invalid service endpoint %s", 
serviceEndpoint));
-            }
-        }
-
-        boolean pathStyleAddressing =
-                
validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME),
 serviceEndpoint);
-        builder.forcePathStyle(pathStyleAddressing);
-        return builder.build();
-    }
-
-    public static AwsCredentialsProvider 
buildCredentialsProvider(IApplicationContext appCtx,
-            Map<String, String> configuration) throws CompilationException {
-        AuthenticationType authenticationType = 
getAuthenticationType(configuration);
-        switch (authenticationType) {
-            case ANONYMOUS:
-                return AnonymousCredentialsProvider.create();
-            case ARN_ASSUME_ROLE:
-                return getTrustAccountCredentials(appCtx, configuration);
-            case INSTANCE_PROFILE:
-                return getInstanceProfileCredentials(configuration);
-            case ACCESS_KEYS:
-                return getAccessKeyCredentials(configuration);
-            default:
-                // missing required creds, report correct error message
-                String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
-                if (externalId != null) {
-                    throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
-                            EXTERNAL_ID_FIELD_NAME);
-                } else {
-                    throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
-                            SESSION_TOKEN_FIELD_NAME);
-                }
-        }
-    }
-
-    public static Region validateAndGetRegion(String regionId) throws 
CompilationException {
-        List<Region> regions = S3Client.serviceMetadata().regions();
-        Optional<Region> selectedRegion = regions.stream().filter(region -> 
region.id().equals(regionId)).findFirst();
-
-        if (selectedRegion.isEmpty()) {
-            throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
-        }
-        return selectedRegion.get();
-    }
-
-    private static AuthenticationType getAuthenticationType(Map<String, 
String> configuration) {
-        String roleArn = configuration.get(ROLE_ARN_FIELD_NAME);
-        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
-        if (noAuth(configuration)) {
-            return AuthenticationType.ANONYMOUS;
-        } else if (roleArn != null) {
-            return AuthenticationType.ARN_ASSUME_ROLE;
-        } else if (instanceProfile != null) {
-            return AuthenticationType.INSTANCE_PROFILE;
-        } else if (accessKeyId != null || secretAccessKey != null) {
-            return AuthenticationType.ACCESS_KEYS;
-        } else {
-            return AuthenticationType.BAD_AUTHENTICATION;
-        }
-    }
-
-    public static boolean validateAndGetPathStyleAddressing(String 
pathStyleAddressing, String endpoint)
-            throws CompilationException {
-        if (pathStyleAddressing == null) {
-            return endpoint != null && !endpoint.isEmpty();
-        }
-        validatePathStyleAddressing(pathStyleAddressing);
-        return Boolean.parseBoolean(pathStyleAddressing);
-    }
-
-    public static void validatePathStyleAddressing(String pathStyleAddressing) 
throws CompilationException {
-        if (pathStyleAddressing == null) {
-            return;
-        }
-        if (!"true".equalsIgnoreCase(pathStyleAddressing) && 
!"false".equalsIgnoreCase(pathStyleAddressing)) {
-            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
PATH_STYLE_ADDRESSING_FIELD_NAME,
-                    "true, false");
-        }
-    }
-
-    public static boolean validateAndGetCrossRegion(String crossRegion) throws 
CompilationException {
-        if (crossRegion == null) {
-            return false;
-        }
-        if (!"true".equalsIgnoreCase(crossRegion) && 
!"false".equalsIgnoreCase(crossRegion)) {
-            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
CROSS_REGION_FIELD_NAME, "true, false");
-        }
-        return Boolean.parseBoolean(crossRegion);
-    }
-
-    private static boolean noAuth(Map<String, String> configuration) {
-        return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, 
ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
-                ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, 
SESSION_TOKEN_FIELD_NAME) == null;
-    }
-
-    /**
-     * Returns the cached credentials if valid, otherwise, generates new 
credentials
-     *
-     * @param appCtx application context
-     * @param configuration configuration
-     * @return returns the cached credentials if valid, otherwise, generates 
new credentials
-     * @throws CompilationException CompilationException
-     */
-    public static AwsCredentialsProvider 
getTrustAccountCredentials(IApplicationContext appCtx,
-            Map<String, String> configuration) throws CompilationException {
-        IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
-        Object credentialsObject = 
cache.get(configuration.get(ExternalDataConstants.KEY_ENTITY_ID));
-        if (credentialsObject != null) {
-            return () -> (AwsSessionCredentials) credentialsObject;
-        }
-        IExternalCredentialsCacheUpdater cacheUpdater = 
appCtx.getExternalCredentialsCacheUpdater();
-        AwsSessionCredentials credentials;
-        try {
-            credentials = (AwsSessionCredentials) 
cacheUpdater.generateAndCacheCredentials(configuration);
-        } catch (HyracksDataException ex) {
-            throw new 
CompilationException(ErrorCode.FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION, 
ex, ex.getMessage());
-        }
-
-        return () -> credentials;
-    }
-
-    /**
-     * Assume role using provided credentials and return the new credentials
-     *
-     * @param configuration configuration
-     * @return return credentials from the assume role
-     * @throws CompilationException CompilationException
-     */
-    public static AwsCredentialsProvider 
assumeRoleAndGetCredentials(Map<String, String> configuration)
-            throws CompilationException {
-        String regionId = configuration.get(REGION_FIELD_NAME);
-        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
-        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
-        Region region = validateAndGetRegion(regionId);
-
-        AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
-        builder.roleArn(arnRole);
-        builder.roleSessionName(UUID.randomUUID().toString());
-        builder.durationSeconds(900); // TODO(htowaileb): configurable? Can be 
900 to 43200 (15 mins to 12 hours)
-        if (externalId != null) {
-            builder.externalId(externalId);
-        }
-        AssumeRoleRequest request = builder.build();
-        AwsCredentialsProvider credentialsProvider = 
getCredentialsToAssumeRole(configuration);
-
-        // assume the role from the provided arn
-        try (StsClient stsClient =
-                
StsClient.builder().region(region).credentialsProvider(credentialsProvider).build())
 {
-            AssumeRoleResponse response = stsClient.assumeRole(request);
-            Credentials credentials = response.credentials();
-            return 
StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
-                    credentials.secretAccessKey(), 
credentials.sessionToken()));
-        } catch (SdkException ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
-        }
-    }
-
-    private static AwsCredentialsProvider 
getCredentialsToAssumeRole(Map<String, String> configuration)
-            throws CompilationException {
-        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        if (instanceProfile != null) {
-            return getInstanceProfileCredentials(configuration);
-        } else if (accessKeyId != null || secretAccessKey != null) {
-            return getAccessKeyCredentials(configuration);
-        } else {
-            throw new 
CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
-        }
-    }
-
-    private static AwsCredentialsProvider 
getInstanceProfileCredentials(Map<String, String> configuration)
-            throws CompilationException {
-        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-
-        // only "true" value is allowed
-        if (!"true".equalsIgnoreCase(instanceProfile)) {
-            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
INSTANCE_PROFILE_FIELD_NAME, "true");
-        }
-
-        String notAllowed = getNonNull(configuration, 
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                SESSION_TOKEN_FIELD_NAME);
-        if (notAllowed != null) {
-            throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                    INSTANCE_PROFILE_FIELD_NAME);
-        }
-        return InstanceProfileCredentialsProvider.create();
-    }
-
-    private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, 
String> configuration)
-            throws CompilationException {
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
-
-        if (accessKeyId == null) {
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
-                    SECRET_ACCESS_KEY_FIELD_NAME);
-        }
-        if (secretAccessKey == null) {
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
SECRET_ACCESS_KEY_FIELD_NAME,
-                    ACCESS_KEY_ID_FIELD_NAME);
-        }
-
-        // use session token if provided
-        if (sessionToken != null) {
-            return StaticCredentialsProvider
-                    .create(AwsSessionCredentials.create(accessKeyId, 
secretAccessKey, sessionToken));
-        } else {
-            return 
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, 
secretAccessKey));
-        }
-    }
-
-    private static String getNonNull(Map<String, String> configuration, 
String... fieldNames) {
-        for (String fieldName : fieldNames) {
-            if (configuration.get(fieldName) != null) {
-                return fieldName;
-            }
-        }
-        return null;
-    }
-
-    public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, 
JobConf conf,
-            Map<String, String> configuration) throws CompilationException {
-        configureAwsS3HdfsJobConf(appCtx, conf, configuration, 0);
-    }
-
-    /**
-     * Builds the S3 client using the provided configuration
-     *
-     * @param appCtx application context
-     * @param configuration      properties
-     * @param numberOfPartitions number of partitions in the cluster
-     */
-    public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, 
JobConf jobConf,
-            Map<String, String> configuration, int numberOfPartitions) throws 
CompilationException {
-        setHadoopCredentials(appCtx, jobConf, configuration);
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-        Region region = 
validateAndGetRegion(configuration.get(REGION_FIELD_NAME));
-        jobConf.set(HADOOP_REGION, region.toString());
-        if (serviceEndpoint != null) {
-            // Validation of the URL should be done at hadoop-aws level
-            jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        } else {
-            //Region is ignored and buckets could be found by the central 
endpoint
-            jobConf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
-        }
-
-        boolean pathStyleAddressing =
-                
validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME),
 serviceEndpoint);
-        if (pathStyleAddressing) {
-            jobConf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
-        } else {
-            jobConf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.FALSE);
-        }
-
-        /*
-         * Set the size of S3 connection pool to be the number of partitions
-         */
-        if (numberOfPartitions != 0) {
-            jobConf.set(HADOOP_S3_CONNECTION_POOL_SIZE, 
String.valueOf(numberOfPartitions));
-        }
-    }
-
-    /**
-     * Sets the credentials provider type and the credentials to hadoop based 
on the provided configuration
-     *
-     * @param jobConf hadoop job config
-     * @param configuration external details configuration
-     */
-    private static void setHadoopCredentials(IApplicationContext appCtx, 
JobConf jobConf,
-            Map<String, String> configuration) throws CompilationException {
-        AuthenticationType authenticationType = 
getAuthenticationType(configuration);
-        switch (authenticationType) {
-            case ANONYMOUS:
-                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS);
-                break;
-            case ARN_ASSUME_ROLE:
-                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, 
HADOOP_ASSUMED_ROLE);
-                jobConf.set(HADOOP_ASSUME_ROLE_ARN, 
configuration.get(ROLE_ARN_FIELD_NAME));
-                jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, 
configuration.get(EXTERNAL_ID_FIELD_NAME));
-                jobConf.set(HADOOP_ASSUME_ROLE_SESSION_NAME, "parquet-" + 
UUID.randomUUID());
-
-                // hadoop accepts time 15m to 1h, we will base it on the 
provided configuration
-                int durationInSeconds = 
appCtx.getExternalProperties().getAwsAssumeRoleDuration();
-                String hadoopDuration = getHadoopDuration(durationInSeconds);
-                jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, 
hadoopDuration);
-
-                // TODO: this assumes basic keys always, also support if we 
use InstanceProfile to assume a role
-                jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY, 
HADOOP_SIMPLE);
-                jobConf.set(HADOOP_ACCESS_KEY_ID, 
configuration.get(ACCESS_KEY_ID_FIELD_NAME));
-                jobConf.set(HADOOP_SECRET_ACCESS_KEY, 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME));
-                break;
-            case INSTANCE_PROFILE:
-                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, 
HADOOP_INSTANCE_PROFILE);
-                break;
-            case ACCESS_KEYS:
-                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_SIMPLE);
-                jobConf.set(HADOOP_ACCESS_KEY_ID, 
configuration.get(ACCESS_KEY_ID_FIELD_NAME));
-                jobConf.set(HADOOP_SECRET_ACCESS_KEY, 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME));
-                if (configuration.get(SESSION_TOKEN_FIELD_NAME) != null) {
-                    jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, 
HADOOP_TEMPORARY);
-                    jobConf.set(HADOOP_SESSION_TOKEN, 
configuration.get(SESSION_TOKEN_FIELD_NAME));
-                }
-                break;
-            case BAD_AUTHENTICATION:
-                throw new 
CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
-        }
-    }
-
-    /**
-     * Hadoop accepts duration values from 15m to 1h (in this format). We will 
base this on the configured
-     * duration in seconds. If the time exceeds 1 hour, we will return 1h
-     *
-     * @param seconds configured duration in seconds
-     * @return hadoop updated duration
-     */
-    private static String getHadoopDuration(int seconds) {
-        // constants for time thresholds
-        final int FIFTEEN_MINUTES_IN_SECONDS = 15 * 60;
-        final int ONE_HOUR_IN_SECONDS = 60 * 60;
-
-        // Adjust seconds to fit within bounds
-        if (seconds < FIFTEEN_MINUTES_IN_SECONDS) {
-            seconds = FIFTEEN_MINUTES_IN_SECONDS;
-        } else if (seconds > ONE_HOUR_IN_SECONDS) {
-            seconds = ONE_HOUR_IN_SECONDS;
-        }
-
-        // Convert seconds to minutes
-        int minutes = seconds / 60;
-
-        // Format the result
-        if (minutes == 60) {
-            return "1h";
-        } else {
-            return minutes + "m";
-        }
-    }
-
-    /**
-     * Validate external dataset properties
-     *
-     * @param configuration properties
-     * @throws CompilationException Compilation exception
-     */
-    public static void validateProperties(IApplicationContext appCtx, 
Map<String, String> configuration,
-            SourceLocation srcLoc, IWarningCollector collector) throws 
CompilationException {
-        if (isDeltaTable(configuration)) {
-            validateDeltaTableProperties(configuration);
-        }
-        // check if the format property is present
-        else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
-            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
-        }
-
-        validateIncludeExclude(configuration);
-        try {
-            // TODO(htowaileb): maybe something better, this will check to 
ensure type is supported before creation
-            new ExternalDataPrefix(configuration);
-        } catch (AlgebricksException ex) {
-            throw new 
CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
-        }
-
-        // Check if the bucket is present
-        S3Client s3Client = buildAwsS3Client(appCtx, configuration);
-        S3Response response;
-        boolean useOldApi = false;
-        String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        String prefix = getPrefix(configuration);
-
-        try {
-            response = S3Utils.isBucketEmpty(s3Client, container, prefix, 
false);
-        } catch (S3Exception ex) {
-            // Method not implemented, try falling back to old API
-            try {
-                // For error code, see 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                if 
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
-                    useOldApi = true;
-                    response = S3Utils.isBucketEmpty(s3Client, container, 
prefix, true);
-                } else {
-                    throw ex;
-                }
-            } catch (SdkException ex2) {
-                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, 
getMessageOrToString(ex));
-            }
-        } catch (SdkException ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
-        } finally {
-            if (s3Client != null) {
-                CleanupUtils.close(s3Client, null);
-            }
-        }
-
-        boolean isEmpty = useOldApi ? ((ListObjectsResponse) 
response).contents().isEmpty()
-                : ((ListObjectsV2Response) response).contents().isEmpty();
-        if (isEmpty && collector.shouldWarn()) {
-            Warning warning = Warning.of(srcLoc, 
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-            collector.warn(warning);
-        }
-
-        // Returns 200 only in case the bucket exists, otherwise, throws an 
exception. However, to
-        // ensure coverage, check if the result is successful as well and not 
only catch exceptions
-        if (!response.sdkHttpResponse().isSuccessful()) {
-            throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
-        }
-        if (isDeltaTable(configuration)) {
-            try {
-                validateDeltaTableExists(appCtx, configuration);
-            } catch (AlgebricksException e) {
-                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e);
-            }
-        }
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
index 102c378cca..3cfef1e996 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -26,23 +26,7 @@ public class S3Constants {
     // Key max length
     public static final int MAX_KEY_LENGTH_IN_BYTES = 1024;
 
-    // Authentication specific parameters
-    public static final String REGION_FIELD_NAME = "region";
     public static final String PATH_STYLE_ADDRESSING_FIELD_NAME = 
"pathStyleAddressing";
-    public static final String CROSS_REGION_FIELD_NAME = "crossRegion";
-    public static final String INSTANCE_PROFILE_FIELD_NAME = "instanceProfile";
-    public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
-    public static final String SECRET_ACCESS_KEY_FIELD_NAME = 
"secretAccessKey";
-    public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
-    public static final String ROLE_ARN_FIELD_NAME = "roleArn";
-    public static final String EXTERNAL_ID_FIELD_NAME = "externalId";
-    public static final String SERVICE_END_POINT_FIELD_NAME = 
"serviceEndpoint";
-
-    // AWS S3 specific error codes
-    public static final String ERROR_INTERNAL_ERROR = "InternalError";
-    public static final String ERROR_SLOW_DOWN = "SlowDown";
-    public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
-    public static final String ERROR_EXPIRED_TOKEN = "ExpiredToken";
 
     /*
      * Hadoop-AWS
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 7783456871..e20e39f2bf 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -18,10 +18,50 @@
  */
 package org.apache.asterix.external.util.aws.s3;
 
+import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
+import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
+import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.ACCESS_KEY_ID_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.CROSS_REGION_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.ERROR_INTERNAL_ERROR;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.ERROR_METHOD_NOT_IMPLEMENTED;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.ERROR_SLOW_DOWN;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.EXTERNAL_ID_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.ROLE_ARN_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.SECRET_ACCESS_KEY_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.SERVICE_END_POINT_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.SESSION_TOKEN_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsUtils.buildCredentialsProvider;
+import static 
org.apache.asterix.external.util.aws.AwsUtils.getAuthenticationType;
+import static 
org.apache.asterix.external.util.aws.AwsUtils.validateAndGetCrossRegion;
+import static 
org.apache.asterix.external.util.aws.AwsUtils.validateAndGetRegion;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUMED_ROLE;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_ARN;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_EXTERNAL_ID;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INSTANCE_PROFILE;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_REGION;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SIMPLE;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMPORARY;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.PATH_STYLE_ADDRESSING_FIELD_NAME;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -38,13 +78,21 @@ import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.aws.AwsUtils;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.api.util.CleanupUtils;
 
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.model.CommonPrefix;
 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
@@ -60,6 +108,241 @@ public class S3Utils {
         throw new AssertionError("do not instantiate");
     }
 
+    /**
+     * Builds the S3 client using the provided configuration
+     *
+     * @param configuration properties
+     * @return S3 client
+     * @throws CompilationException CompilationException
+     */
+    public static S3Client buildClient(IApplicationContext appCtx, Map<String, 
String> configuration)
+            throws CompilationException {
+        String regionId = configuration.get(REGION_FIELD_NAME);
+        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
+
+        Region region = validateAndGetRegion(regionId);
+        boolean crossRegion = 
validateAndGetCrossRegion(configuration.get(CROSS_REGION_FIELD_NAME));
+        AwsCredentialsProvider credentialsProvider = 
buildCredentialsProvider(appCtx, configuration);
+
+        S3ClientBuilder builder = S3Client.builder();
+        builder.region(region);
+        builder.crossRegionAccessEnabled(crossRegion);
+        builder.credentialsProvider(credentialsProvider);
+
+        // Validate the service endpoint if present
+        if (serviceEndpoint != null) {
+            try {
+                URI uri = new URI(serviceEndpoint);
+                try {
+                    builder.endpointOverride(uri);
+                } catch (NullPointerException ex) {
+                    throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
+                }
+            } catch (URISyntaxException ex) {
+                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
+                        String.format("Invalid service endpoint %s", 
serviceEndpoint));
+            }
+        }
+
+        boolean pathStyleAddressing =
+                
validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME),
 serviceEndpoint);
+        builder.forcePathStyle(pathStyleAddressing);
+        return builder.build();
+    }
+
+    public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, 
JobConf conf,
+            Map<String, String> configuration) throws CompilationException {
+        configureAwsS3HdfsJobConf(appCtx, conf, configuration, 0);
+    }
+
+    /**
+     * Builds the S3 client using the provided configuration
+     *
+     * @param appCtx application context
+     * @param configuration      properties
+     * @param numberOfPartitions number of partitions in the cluster
+     */
+    public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, 
JobConf jobConf,
+            Map<String, String> configuration, int numberOfPartitions) throws 
CompilationException {
+        setHadoopCredentials(appCtx, jobConf, configuration);
+        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
+        Region region = 
validateAndGetRegion(configuration.get(REGION_FIELD_NAME));
+        jobConf.set(HADOOP_REGION, region.toString());
+        if (serviceEndpoint != null) {
+            // Validation of the URL should be done at hadoop-aws level
+            jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
+        } else {
+            //Region is ignored and buckets could be found by the central 
endpoint
+            jobConf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
+        }
+
+        boolean pathStyleAddressing =
+                
validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME),
 serviceEndpoint);
+        if (pathStyleAddressing) {
+            jobConf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
+        } else {
+            jobConf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.FALSE);
+        }
+
+        /*
+         * Set the size of S3 connection pool to be the number of partitions
+         */
+        if (numberOfPartitions != 0) {
+            jobConf.set(HADOOP_S3_CONNECTION_POOL_SIZE, 
String.valueOf(numberOfPartitions));
+        }
+    }
+
+    /**
+     * Sets the credentials provider type and the credentials to hadoop based 
on the provided configuration
+     *
+     * @param jobConf hadoop job config
+     * @param configuration external details configuration
+     */
+    private static void setHadoopCredentials(IApplicationContext appCtx, 
JobConf jobConf,
+            Map<String, String> configuration) throws CompilationException {
+        AwsUtils.AuthenticationType authenticationType = 
getAuthenticationType(configuration);
+        switch (authenticationType) {
+            case ANONYMOUS:
+                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS);
+                break;
+            case ARN_ASSUME_ROLE:
+                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, 
HADOOP_ASSUMED_ROLE);
+                jobConf.set(HADOOP_ASSUME_ROLE_ARN, 
configuration.get(ROLE_ARN_FIELD_NAME));
+                jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, 
configuration.get(EXTERNAL_ID_FIELD_NAME));
+                jobConf.set(HADOOP_ASSUME_ROLE_SESSION_NAME, "parquet-" + 
UUID.randomUUID());
+
+                // hadoop accepts time 15m to 1h, we will base it on the 
provided configuration
+                int durationInSeconds = 
appCtx.getExternalProperties().getAwsAssumeRoleDuration();
+                String hadoopDuration = getHadoopDuration(durationInSeconds);
+                jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, 
hadoopDuration);
+
+                // TODO: this assumes basic keys always, also support if we 
use InstanceProfile to assume a role
+                jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY, 
HADOOP_SIMPLE);
+                jobConf.set(HADOOP_ACCESS_KEY_ID, 
configuration.get(ACCESS_KEY_ID_FIELD_NAME));
+                jobConf.set(HADOOP_SECRET_ACCESS_KEY, 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME));
+                break;
+            case INSTANCE_PROFILE:
+                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, 
HADOOP_INSTANCE_PROFILE);
+                break;
+            case ACCESS_KEYS:
+                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_SIMPLE);
+                jobConf.set(HADOOP_ACCESS_KEY_ID, 
configuration.get(ACCESS_KEY_ID_FIELD_NAME));
+                jobConf.set(HADOOP_SECRET_ACCESS_KEY, 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME));
+                if (configuration.get(SESSION_TOKEN_FIELD_NAME) != null) {
+                    jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, 
HADOOP_TEMPORARY);
+                    jobConf.set(HADOOP_SESSION_TOKEN, 
configuration.get(SESSION_TOKEN_FIELD_NAME));
+                }
+                break;
+            case BAD_AUTHENTICATION:
+                throw new 
CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
+        }
+    }
+
+    /**
+     * Hadoop accepts duration values from 15m to 1h (in this format). We will 
base this on the configured
+     * duration in seconds. If the time exceeds 1 hour, we will return 1h
+     *
+     * @param seconds configured duration in seconds
+     * @return hadoop updated duration
+     */
+    private static String getHadoopDuration(int seconds) {
+        // constants for time thresholds
+        final int FIFTEEN_MINUTES_IN_SECONDS = 15 * 60;
+        final int ONE_HOUR_IN_SECONDS = 60 * 60;
+
+        // Adjust seconds to fit within bounds
+        if (seconds < FIFTEEN_MINUTES_IN_SECONDS) {
+            seconds = FIFTEEN_MINUTES_IN_SECONDS;
+        } else if (seconds > ONE_HOUR_IN_SECONDS) {
+            seconds = ONE_HOUR_IN_SECONDS;
+        }
+
+        // Convert seconds to minutes
+        int minutes = seconds / 60;
+
+        // Format the result
+        if (minutes == 60) {
+            return "1h";
+        } else {
+            return minutes + "m";
+        }
+    }
+
+    /**
+     * Validate external dataset properties
+     *
+     * @param configuration properties
+     * @throws CompilationException Compilation exception
+     */
+    public static void validateProperties(IApplicationContext appCtx, 
Map<String, String> configuration,
+            SourceLocation srcLoc, IWarningCollector collector) throws 
CompilationException {
+        if (isDeltaTable(configuration)) {
+            validateDeltaTableProperties(configuration);
+        }
+        // check if the format property is present
+        else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
+        }
+
+        validateIncludeExclude(configuration);
+        try {
+            // TODO(htowaileb): maybe something better, this will check to 
ensure type is supported before creation
+            new ExternalDataPrefix(configuration);
+        } catch (AlgebricksException ex) {
+            throw new 
CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
+        }
+
+        // Check if the bucket is present
+        S3Client s3Client = buildClient(appCtx, configuration);
+        S3Response response;
+        boolean useOldApi = false;
+        String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+        String prefix = getPrefix(configuration);
+
+        try {
+            response = S3Utils.isBucketEmpty(s3Client, container, prefix, 
false);
+        } catch (S3Exception ex) {
+            // Method not implemented, try falling back to old API
+            try {
+                // For error code, see 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+                if 
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
+                    useOldApi = true;
+                    response = S3Utils.isBucketEmpty(s3Client, container, 
prefix, true);
+                } else {
+                    throw ex;
+                }
+            } catch (SdkException ex2) {
+                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, 
getMessageOrToString(ex));
+            }
+        } catch (SdkException ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
+        } finally {
+            if (s3Client != null) {
+                CleanupUtils.close(s3Client, null);
+            }
+        }
+
+        boolean isEmpty = useOldApi ? ((ListObjectsResponse) 
response).contents().isEmpty()
+                : ((ListObjectsV2Response) response).contents().isEmpty();
+        if (isEmpty && collector.shouldWarn()) {
+            Warning warning = Warning.of(srcLoc, 
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+            collector.warn(warning);
+        }
+
+        // Returns 200 only in case the bucket exists, otherwise, throws an 
exception. However, to
+        // ensure coverage, check if the result is successful as well and not 
only catch exceptions
+        if (!response.sdkHttpResponse().isSuccessful()) {
+            throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
+        }
+        if (isDeltaTable(configuration)) {
+            try {
+                validateDeltaTableExists(appCtx, configuration);
+            } catch (AlgebricksException e) {
+                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e);
+            }
+        }
+    }
+
     /**
      * Checks for a single object in the specified bucket to determine if the 
bucket is empty or not.
      *
@@ -96,7 +379,7 @@ public class S3Utils {
         // Prepare to retrieve the objects
         List<S3Object> filesOnly;
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx, 
configuration);
+        S3Client s3Client = buildClient(appCtx, configuration);
         String prefix = getPrefix(configuration);
 
         try {
@@ -243,7 +526,7 @@ public class S3Utils {
             Map<String, String> configuration, String container, String prefix)
             throws CompilationException, HyracksDataException {
         // create s3 client
-        S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx, 
configuration);
+        S3Client s3Client = buildClient(appCtx, configuration);
         // fetch all the s3 objects
         return listS3ObjectsOfSingleDepth(s3Client, container, prefix);
     }
@@ -300,12 +583,26 @@ public class S3Utils {
                 + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
     }
 
-    /**
-     * Generates a random external ID to be used in cross-account role 
assumption.
-     *
-     * @return external id
-     */
-    public static String generateExternalId() {
-        return UUID.randomUUID().toString();
+    public static boolean isRetryableError(String errorCode) {
+        return errorCode.equals(ERROR_INTERNAL_ERROR) || 
errorCode.equals(ERROR_SLOW_DOWN);
+    }
+
+    public static boolean validateAndGetPathStyleAddressing(String 
pathStyleAddressing, String endpoint)
+            throws CompilationException {
+        if (pathStyleAddressing == null) {
+            return endpoint != null && !endpoint.isEmpty();
+        }
+        validatePathStyleAddressing(pathStyleAddressing);
+        return "true".equalsIgnoreCase(pathStyleAddressing);
+    }
+
+    public static void validatePathStyleAddressing(String pathStyleAddressing) 
throws CompilationException {
+        if (pathStyleAddressing == null) {
+            return;
+        }
+        if (!"true".equalsIgnoreCase(pathStyleAddressing) && 
!"false".equalsIgnoreCase(pathStyleAddressing)) {
+            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
PATH_STYLE_ADDRESSING_FIELD_NAME,
+                    "true, false");
+        }
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
index 91afbd8279..2f8791ed41 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.input.record.reader.awss3;
 
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.ERROR_INTERNAL_ERROR;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.ERROR_SLOW_DOWN;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;

Reply via email to