This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d21a38bf55 [ASTERIXDB-3514][EXT]: Assume role only when temporary 
credentials expire
d21a38bf55 is described below

commit d21a38bf553a2575741ffef438b4959cee6bada4
Author: Hussain Towaileb <[email protected]>
AuthorDate: Wed Nov 13 21:56:54 2024 +0300

    [ASTERIXDB-3514][EXT]: Assume role only when temporary credentials expire
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    When using temporary credentials from assuming
    a role, cache the credentials and only refresh
    them when they expired.
    
    Ext-ref: MB-63505
    Change-Id: I622853b794f81cd4bda84964a6cf7041d889d20f
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19067
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Peeyush Gupta <[email protected]>
    Reviewed-by: Hussain Towaileb <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Hussain Towaileb <[email protected]>
---
 .../asterix/app/cc/CcApplicationContext.java       |  18 +
 .../app/external/ExternalCredentialsCache.java     |  97 ++++
 .../external/ExternalCredentialsCacheUpdater.java  | 153 ++++++
 .../app/message/RefreshAwsCredentialsRequest.java  |  81 +++
 .../app/message/RefreshAwsCredentialsResponse.java |  73 +++
 .../message/UpdateAwsCredentialsCacheRequest.java  |  46 ++
 .../apache/asterix/app/nc/NCAppRuntimeContext.java |  18 +
 .../AbstractCloudExternalFileWriterFactory.java    |  11 +-
 .../cloud/writer/GCSExternalFileWriterFactory.java |   5 +-
 .../cloud/writer/S3ExternalFileWriterFactory.java  |   9 +-
 .../asterix/common/api/IApplicationContext.java    |  12 +
 .../asterix/common/exceptions/ErrorCode.java       |   2 +
 .../common/external/IExternalCredentialsCache.java |  48 ++
 .../IExternalCredentialsCacheUpdater.java}         |  17 +-
 .../src/main/resources/asx_errormsg/en.properties  |   2 +
 .../input/record/reader/aws/AwsS3InputStream.java  |  11 +-
 .../record/reader/aws/AwsS3InputStreamFactory.java |  10 +-
 .../aws/parquet/AwsS3ParquetReaderFactory.java     |   6 +-
 .../external/util/ExternalDataConstants.java       |   1 +
 .../asterix/external/util/ExternalDataUtils.java   |   4 +-
 .../util/aws/s3/{S3Utils.java => S3AuthUtils.java} | 558 +++++++--------------
 .../asterix/external/util/aws/s3/S3Utils.java      | 391 +--------------
 .../writer/HDFSExternalFileWriterFactory.java      |   3 +-
 .../writer/LocalFSExternalFileWriterFactory.java   |   3 +-
 .../metadata/declared/MetadataProvider.java        |   1 +
 .../metadata/provider/ExternalWriterProvider.java  |   2 +-
 .../writer/IExternalWriterFactoryValidator.java    |   3 +-
 27 files changed, 793 insertions(+), 792 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index 6ea7aebf83..e892d0445e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
+import org.apache.asterix.app.external.ExternalCredentialsCache;
+import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IConfigValidator;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
@@ -55,6 +57,8 @@ import 
org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
 import org.apache.asterix.common.external.IAdapterFactoryService;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.metadata.IMetadataLockUtil;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
@@ -127,6 +131,8 @@ public class CcApplicationContext implements 
ICcApplicationContext {
     private final IOManager ioManager;
     private final INamespacePathResolver namespacePathResolver;
     private final INamespaceResolver namespaceResolver;
+    private final IExternalCredentialsCache externalCredentialsCache;
+    private final IExternalCredentialsCacheUpdater 
externalCredentialsCacheUpdater;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, 
HyracksConnection hcc,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, 
IGlobalRecoveryManager globalRecoveryManager,
@@ -177,6 +183,8 @@ public class CcApplicationContext implements 
ICcApplicationContext {
         this.globalTxManager = globalTxManager;
         this.ioManager = ioManager;
         dataPartitioningProvider = DataPartitioningProvider.create(this);
+        externalCredentialsCache = new ExternalCredentialsCache();
+        externalCredentialsCacheUpdater = new 
ExternalCredentialsCacheUpdater(this);
     }
 
     @Override
@@ -415,4 +423,14 @@ public class CcApplicationContext implements 
ICcApplicationContext {
     public IOManager getIoManager() {
         return ioManager;
     }
+
+    @Override
+    public IExternalCredentialsCache getExternalCredentialsCache() {
+        return externalCredentialsCache;
+    }
+
+    @Override
+    public IExternalCredentialsCacheUpdater 
getExternalCredentialsCacheUpdater() {
+        return externalCredentialsCacheUpdater;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
new file mode 100644
index 0000000000..0ddca4e700
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.metadata.MetadataConstants;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.util.Span;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class ExternalCredentialsCache implements IExternalCredentialsCache {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ConcurrentMap<String, Pair<Span, Object>> cache = new 
ConcurrentHashMap<>();
+
+    public ExternalCredentialsCache() {
+    }
+
+    @Override
+    public synchronized Object getCredentials(Map<String, String> 
configuration) {
+        String name = getName(configuration);
+        if (cache.containsKey(name) && 
!needsRefresh(cache.get(name).getLeft())) {
+            return cache.get(name).getRight();
+        }
+        return null;
+    }
+
+    @Override
+    public synchronized void updateCache(Map<String, String> configuration, 
Map<String, String> credentials) {
+        String type = configuration.get(ExternalDataConstants.KEY_READER);
+        if 
(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
+            updateAwsCache(configuration, credentials);
+        }
+    }
+
+    @Override
+    public String getName(Map<String, String> configuration) {
+        String database = 
configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE);
+        if (database == null) {
+            database = MetadataConstants.DEFAULT_DATABASE;
+        }
+        String dataverse = 
configuration.get(ExternalDataConstants.KEY_DATASET_DATAVERSE);
+        String dataset = configuration.get(ExternalDataConstants.KEY_DATASET);
+        return String.join(".", database, dataverse, dataset);
+    }
+
+    private void updateAwsCache(Map<String, String> configuration, Map<String, 
String> credentials) {
+        String accessKeyId = 
credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = 
credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
+        String sessionToken = 
credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
+        doUpdateAwsCache(configuration, 
AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
+    }
+
+    private void doUpdateAwsCache(Map<String, String> configuration, 
AwsSessionCredentials credentials) {
+        // TODO(htowaileb): Set default expiration value
+        String name = getName(configuration);
+        cache.put(name, Pair.of(Span.start(15, TimeUnit.MINUTES), 
credentials));
+        LOGGER.info("Received and cached new credentials for {}", name);
+    }
+
+    /**
+     * Refresh if the remaining time is half or less than the total expiration 
time
+     *
+     * @param span expiration span
+     * @return true if the remaining time is half or less than the total 
expiration time, false otherwise
+     */
+    private boolean needsRefresh(Span span) {
+        // TODO(htowaileb): At what % (and should be configurable?) do we 
decide it's better to refresh credentials
+        return (double) span.remaining(TimeUnit.MINUTES) / 
span.getSpan(TimeUnit.MINUTES) < 0.5;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
new file mode 100644
index 0000000000..f07caaae97
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import static 
org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE;
+import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED;
+import static 
org.apache.asterix.common.exceptions.ErrorCode.REJECT_BAD_CLUSTER_STATE;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.message.RefreshAwsCredentialsRequest;
+import org.apache.asterix.app.message.RefreshAwsCredentialsResponse;
+import org.apache.asterix.app.message.UpdateAwsCredentialsCacheRequest;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class ExternalCredentialsCacheUpdater implements 
IExternalCredentialsCacheUpdater {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final IApplicationContext appCtx;
+
+    public ExternalCredentialsCacheUpdater(IApplicationContext appCtx) {
+        this.appCtx = appCtx;
+    }
+
+    @Override
+    public synchronized Object generateAndCacheCredentials(Map<String, String> 
configuration)
+            throws HyracksDataException, CompilationException {
+        IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
+        Object credentials = cache.getCredentials(configuration);
+        if (credentials != null) {
+            return credentials;
+        }
+
+        /*
+         * if we are the CC, generate new creds and ask all NCs to update 
their cache
+         * if we are the NC, send a message to the CC to generate new creds 
and ask all NCs to update their cache
+         */
+        String name = cache.getName(configuration);
+        if (appCtx instanceof ICcApplicationContext) {
+            ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+            IClusterManagementWork.ClusterState state = 
ccAppCtx.getClusterStateManager().getState();
+            if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
+                throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, 
state);
+            }
+
+            String accessKeyId;
+            String secretAccessKey;
+            String sessionToken;
+            Map<String, String> credentialsMap = new HashMap<>();
+            try {
+                LOGGER.info("attempting to update credentials for {}", name);
+                AwsCredentialsProvider newCredentials = 
S3AuthUtils.assumeRoleAndGetCredentials(configuration);
+                LOGGER.info("updated credentials successfully for {}", name);
+                AwsSessionCredentials sessionCredentials = 
(AwsSessionCredentials) newCredentials.resolveCredentials();
+                accessKeyId = sessionCredentials.accessKeyId();
+                secretAccessKey = sessionCredentials.secretAccessKey();
+                sessionToken = sessionCredentials.sessionToken();
+            } catch (CompilationException ex) {
+                LOGGER.info("failed to refresh credentials for {}", name, ex);
+                throw ex;
+            }
+
+            // credentials need refreshing
+            credentialsMap.put(S3Constants.ACCESS_KEY_ID_FIELD_NAME, 
accessKeyId);
+            credentialsMap.put(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME, 
secretAccessKey);
+            credentialsMap.put(S3Constants.SESSION_TOKEN_FIELD_NAME, 
sessionToken);
+
+            // request all NCs to update their credentials cache with the 
latest creds
+            updateNcsCredentialsCache(ccAppCtx, name, credentialsMap, 
configuration);
+            cache.updateCache(configuration, credentialsMap);
+            credentials = AwsSessionCredentials.create(accessKeyId, 
secretAccessKey, sessionToken);
+        } else {
+            NCMessageBroker broker = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+            MessageFuture messageFuture = broker.registerMessageFuture();
+            String nodeId = ((INCServiceContext) 
appCtx.getServiceContext()).getNodeId();
+            long futureId = messageFuture.getFutureId();
+            RefreshAwsCredentialsRequest request = new 
RefreshAwsCredentialsRequest(nodeId, futureId, configuration);
+            try {
+                LOGGER.info("no valid credentials found for {}, requesting 
credentials from CC", name);
+                broker.sendMessageToPrimaryCC(request);
+                RefreshAwsCredentialsResponse response = 
(RefreshAwsCredentialsResponse) messageFuture
+                        .get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                if (response.getFailure() != null) {
+                    throw HyracksDataException.create(response.getFailure());
+                }
+                credentials = 
AwsSessionCredentials.create(response.getAccessKeyId(), 
response.getSecretAccessKey(),
+                        response.getSessionToken());
+            } catch (Exception ex) {
+                LOGGER.info("failed to refresh credentials for {}", name, ex);
+                throw HyracksDataException.create(ex);
+            } finally {
+                broker.deregisterMessageFuture(futureId);
+            }
+        }
+        return credentials;
+    }
+
+    private void updateNcsCredentialsCache(ICcApplicationContext appCtx, 
String name, Map<String, String> credentials,
+            Map<String, String> configuration) throws HyracksDataException {
+        final List<String> ncs = new 
ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
+        CCMessageBroker broker = (CCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        UpdateAwsCredentialsCacheRequest request = new 
UpdateAwsCredentialsCacheRequest(configuration, credentials);
+
+        try {
+            LOGGER.info("requesting all NCs to update their credentials for 
{}", name);
+            for (String nc : ncs) {
+                broker.sendApplicationMessageToNC(request, nc);
+            }
+        } catch (Exception e) {
+            LOGGER.info("failed to send message to nc", e);
+            throw HyracksDataException.create(e);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java
new file mode 100644
index 0000000000..32de92d45b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class RefreshAwsCredentialsRequest implements ICcAddressedMessage {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final String nodeId;
+    private final long reqId;
+    private final Map<String, String> configuration;
+
+    public RefreshAwsCredentialsRequest(String nodeId, long reqId, Map<String, 
String> configuration) {
+        this.nodeId = nodeId;
+        this.reqId = reqId;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public final void handle(ICcApplicationContext appCtx) throws 
HyracksDataException {
+        try {
+            IExternalCredentialsCacheUpdater cacheUpdater = 
appCtx.getExternalCredentialsCacheUpdater();
+            Object credentials = 
cacheUpdater.generateAndCacheCredentials(configuration);
+            AwsSessionCredentials sessionCredentials = (AwsSessionCredentials) 
credentials;
+
+            // respond with the credentials
+            RefreshAwsCredentialsResponse response =
+                    new RefreshAwsCredentialsResponse(reqId, 
sessionCredentials.accessKeyId(),
+                            sessionCredentials.secretAccessKey(), 
sessionCredentials.sessionToken(), null);
+            respond(appCtx, response);
+        } catch (Exception e) {
+            LOGGER.info("failed to refresh credentials", e);
+            RefreshAwsCredentialsResponse response = new 
RefreshAwsCredentialsResponse(reqId, null, null, null, e);
+            respond(appCtx, response);
+        }
+    }
+
+    private void respond(ICcApplicationContext appCtx, 
RefreshAwsCredentialsResponse response)
+            throws HyracksDataException {
+        CCMessageBroker broker = (CCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        try {
+            broker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            LOGGER.info("failed to send reply to nc", e);
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public boolean isWhispered() {
+        return true;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java
new file mode 100644
index 0000000000..9ea0e1165b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+
+public class RefreshAwsCredentialsResponse implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+    private final String accessKeyId;
+    private final String secretAccessKey;
+    private final String sessionToken;
+    private final Throwable failure;
+
+    public RefreshAwsCredentialsResponse(long reqId, String accessKeyId, 
String secretAccessKey, String sessionToken,
+            Throwable failure) {
+        this.reqId = reqId;
+        this.accessKeyId = accessKeyId;
+        this.secretAccessKey = secretAccessKey;
+        this.sessionToken = sessionToken;
+        this.failure = failure;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) {
+        NCMessageBroker mb = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(reqId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+
+    public String getAccessKeyId() {
+        return accessKeyId;
+    }
+
+    public String getSecretAccessKey() {
+        return secretAccessKey;
+    }
+
+    public String getSessionToken() {
+        return sessionToken;
+    }
+
+    public Throwable getFailure() {
+        return failure;
+    }
+
+    @Override
+    public boolean isWhispered() {
+        return true;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
new file mode 100644
index 0000000000..44d4c21305
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+
+public class UpdateAwsCredentialsCacheRequest implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Map<String, String> configuration;
+    private final Map<String, String> credentials;
+
+    public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, 
Map<String, String> credentials) {
+        this.configuration = configuration;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) {
+        appCtx.getExternalCredentialsCache().updateCache(configuration, 
credentials);
+    }
+
+    @Override
+    public boolean isWhispered() {
+        return true;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 7da3838838..8c2a0ab95d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -32,6 +32,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.app.external.ExternalCredentialsCache;
+import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.cloud.CloudConfigurator;
 import org.apache.asterix.cloud.LocalPartitionBootstrapper;
@@ -63,6 +65,8 @@ import 
org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.context.DiskWriteRateLimiterProvider;
 import org.apache.asterix.common.context.GlobalVirtualBufferCache;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
@@ -186,6 +190,8 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     private final INamespacePathResolver namespacePathResolver;
     private final INamespaceResolver namespaceResolver;
     private IDiskCacheMonitoringService diskCacheService;
+    protected IExternalCredentialsCache externalCredentialsCache;
+    protected IExternalCredentialsCacheUpdater externalCredentialsCacheUpdater;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, 
NCExtensionManager extensionManager,
             IPropertiesFactory propertiesFactory, INamespaceResolver 
namespaceResolver,
@@ -210,6 +216,8 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         cacheManager = new CacheManager();
         this.namespacePathResolver = namespacePathResolver;
         this.namespaceResolver = namespaceResolver;
+        this.externalCredentialsCache = new ExternalCredentialsCache();
+        this.externalCredentialsCacheUpdater = new 
ExternalCredentialsCacheUpdater(this);
     }
 
     @Override
@@ -748,4 +756,14 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         return isCloudDeployment() ? 
storageProperties.getStoragePartitionsCount()
                 : ncServiceContext.getIoManager().getIODevices().size();
     }
+
+    @Override
+    public IExternalCredentialsCache getExternalCredentialsCache() {
+        return externalCredentialsCache;
+    }
+
+    @Override
+    public IExternalCredentialsCacheUpdater 
getExternalCredentialsCacheUpdater() {
+        return externalCredentialsCacheUpdater;
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index 589ee7904d..198b3ad81b 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -30,6 +30,7 @@ import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.WriterSingleBufferProvider;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -60,17 +61,17 @@ abstract class AbstractCloudExternalFileWriterFactory 
implements IExternalFileWr
         writeBufferSize = externalConfig.getWriteBufferSize();
     }
 
-    abstract ICloudClient createCloudClient() throws CompilationException;
+    abstract ICloudClient createCloudClient(IApplicationContext appCtx) throws 
CompilationException;
 
     abstract boolean isNoContainerFoundException(IOException e);
 
     abstract boolean isSdkException(Throwable e);
 
-    final void buildClient() throws HyracksDataException {
+    final void buildClient(IApplicationContext appCtx) throws 
HyracksDataException {
         try {
             synchronized (this) {
                 if (cloudClient == null) {
-                    cloudClient = createCloudClient();
+                    cloudClient = createCloudClient(appCtx);
                 }
             }
         } catch (CompilationException e) {
@@ -79,8 +80,8 @@ abstract class AbstractCloudExternalFileWriterFactory 
implements IExternalFileWr
     }
 
     @Override
-    public final void validate() throws AlgebricksException {
-        ICloudClient testClient = createCloudClient();
+    public final void validate(IApplicationContext appCtx) throws 
AlgebricksException {
+        ICloudClient testClient = createCloudClient(appCtx);
         String bucket = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
 
         if (bucket == null || bucket.isEmpty()) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 886f20d12d..63a8366c3c 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -24,6 +24,7 @@ import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
 import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.google.gcs.GCSUtils;
@@ -61,7 +62,7 @@ public final class GCSExternalFileWriterFactory extends 
AbstractCloudExternalFil
     }
 
     @Override
-    ICloudClient createCloudClient() throws CompilationException {
+    ICloudClient createCloudClient(IApplicationContext appCtx) throws 
CompilationException {
         GCSClientConfig config = GCSClientConfig.of(configuration, 
writeBufferSize);
         return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
                 ICloudGuardian.NoOpCloudGuardian.INSTANCE);
@@ -80,7 +81,7 @@ public final class GCSExternalFileWriterFactory extends 
AbstractCloudExternalFil
     @Override
     public IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalPrinterFactory printerFactory)
             throws HyracksDataException {
-        buildClient();
+        buildClient(((IApplicationContext) 
context.getJobletContext().getServiceContext().getApplicationContext()));
         String bucket = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         IExternalPrinter printer = printerFactory.createPrinter();
         IWarningCollector warningCollector = context.getWarningCollector();
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index e07acc0246..d268efd212 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -24,9 +24,10 @@ import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
@@ -61,9 +62,9 @@ public final class S3ExternalFileWriterFactory extends 
AbstractCloudExternalFile
     }
 
     @Override
-    ICloudClient createCloudClient() throws CompilationException {
+    ICloudClient createCloudClient(IApplicationContext appCtx) throws 
CompilationException {
         S3ClientConfig config = S3ClientConfig.of(configuration, 
writeBufferSize);
-        return new S3CloudClient(config, 
S3Utils.buildAwsS3Client(configuration),
+        return new S3CloudClient(config, S3AuthUtils.buildAwsS3Client(appCtx, 
configuration),
                 ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 
@@ -80,7 +81,7 @@ public final class S3ExternalFileWriterFactory extends 
AbstractCloudExternalFile
     @Override
     public IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalPrinterFactory printerFactory)
             throws HyracksDataException {
-        buildClient();
+        buildClient(((IApplicationContext) 
context.getJobletContext().getServiceContext().getApplicationContext()));
         String bucket = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         IExternalPrinter printer = printerFactory.createPrinter();
         IWarningCollector warningCollector = context.getWarningCollector();
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index eabebf7b3a..19e4ad7ab7 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -29,6 +29,8 @@ import org.apache.asterix.common.config.NodeProperties;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -106,4 +108,14 @@ public interface IApplicationContext {
     INamespaceResolver getNamespaceResolver();
 
     INamespacePathResolver getNamespacePathResolver();
+
+    /**
+     * @return external credentials cache
+     */
+    IExternalCredentialsCache getExternalCredentialsCache();
+
+    /**
+     * @return external credentials cache updater
+     */
+    IExternalCredentialsCacheUpdater getExternalCredentialsCacheUpdater();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 0cf6eb2736..35e0699224 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -314,6 +314,8 @@ public enum ErrorCode implements IError {
     MAXIMUM_VALUE_ALLOWED_FOR_PARAM(1209),
     STORAGE_SIZE_NOT_APPLICABLE_TO_TYPE(1210),
     COULD_NOT_CREATE_TOKENS(1211),
+    NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION(1212),
+    FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION(1213),
 
     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
new file mode 100644
index 0000000000..245b350653
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import java.util.Map;
+
+public interface IExternalCredentialsCache {
+
+    /**
+     * Returns the cached credentials. Can be of any supported external 
credentials type
+     *
+     * @param configuration configuration containing external collection 
details
+     * @return credentials if present, null otherwise
+     */
+    Object getCredentials(Map<String, String> configuration);
+
+    /**
+     * Updates the credentials cache with the provided credentials for the 
specified name
+     *
+     * @param configuration configuration containing external collection 
details
+     * @param credentials credentials to cache
+     */
+    void updateCache(Map<String, String> configuration, Map<String, String> 
credentials);
+
+    /**
+     * Returns the name of the entity which the cached credentials belong to
+     *
+     * @param configuration configuration containing external collection 
details
+     * @return name of entity which credentials belong to
+     */
+    String getName(Map<String, String> configuration);
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java
similarity index 61%
copy from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
copy to 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java
index 4a75db6906..48553c0606 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java
@@ -16,13 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.runtime.writer;
+package org.apache.asterix.common.external;
 
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IExternalCredentialsCacheUpdater {
 
-public interface IExternalWriterFactoryValidator {
     /**
-     * Perform the necessary validation to ensure the writer has the proper 
permissions
+     * Generates new credentials and caches them
+     *
+     * @param configuration configuration containing external collection 
details
      */
-    void validate() throws AlgebricksException;
+    Object generateAndCacheCredentials(Map<String, String> configuration)
+            throws HyracksDataException, CompilationException;
 }
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index adb26fe585..d15a751b50 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -316,6 +316,8 @@
 1209 = Maximum value allowed for '%1$s' is %2$s. Found %3$s
 1210 = Retrieving storage size is not applicable to type: %1$s.
 1211 = Could not create delegation tokens
+1212 = No credentials found for cross-account authentication. Expected 
instance profile or access key id & secret access key for assuming role
+1213 = Failed to perform cross-account authentication. Encountered error : 
'%1$s'
 
 # Feed Errors
 3001 = Illegal state.
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 7a1bdad9cb..138b364295 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -26,13 +26,14 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
@@ -48,14 +49,16 @@ import software.amazon.awssdk.services.s3.model.S3Exception;
 public class AwsS3InputStream extends AbstractExternalInputStream {
 
     // Configuration
+    private final IApplicationContext ncAppCtx;
     private final String bucket;
     private final S3Client s3Client;
     private ResponseInputStream<?> s3InStream;
     private static final int MAX_RETRIES = 5; // We will retry 5 times in case 
of internal error from AWS S3 service
 
-    public AwsS3InputStream(Map<String, String> configuration, List<String> 
filePaths,
+    public AwsS3InputStream(IApplicationContext ncAppCtx, Map<String, String> 
configuration, List<String> filePaths,
             IExternalFilterValueEmbedder valueEmbedder) throws 
HyracksDataException {
         super(configuration, filePaths, valueEmbedder);
+        this.ncAppCtx = ncAppCtx;
         this.s3Client = buildAwsS3Client(configuration);
         this.bucket = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
@@ -113,7 +116,7 @@ public class AwsS3InputStream extends 
AbstractExternalInputStream {
     }
 
     private boolean shouldRetry(String errorCode, int currentRetry) {
-        return currentRetry < MAX_RETRIES && 
S3Utils.isRetryableError(errorCode);
+        return currentRetry < MAX_RETRIES && 
S3AuthUtils.isRetryableError(errorCode);
     }
 
     @Override
@@ -141,7 +144,7 @@ public class AwsS3InputStream extends 
AbstractExternalInputStream {
 
     private S3Client buildAwsS3Client(Map<String, String> configuration) 
throws HyracksDataException {
         try {
-            return S3Utils.buildAwsS3Client(configuration);
+            return S3AuthUtils.buildAwsS3Client(ncAppCtx, configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 36d21d1aaf..e9c72e31d2 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
@@ -47,8 +48,10 @@ public class AwsS3InputStreamFactory extends 
AbstractExternalInputStreamFactory
     public AsterixInputStream createInputStream(IExternalDataRuntimeContext 
context) throws HyracksDataException {
         IExternalFilterValueEmbedder valueEmbedder = 
context.getValueEmbedder();
         int partition = context.getPartition();
-        return new AwsS3InputStream(configuration, 
partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
-                valueEmbedder);
+        IApplicationContext ncAppCtx = (IApplicationContext) 
context.getTaskContext().getJobletContext()
+                .getServiceContext().getApplicationContext();
+        return new AwsS3InputStream(ncAppCtx, configuration,
+                partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(), 
valueEmbedder);
     }
 
     @Override
@@ -65,7 +68,8 @@ public class AwsS3InputStreamFactory extends 
AbstractExternalInputStreamFactory
         configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, 
externalDataPrefix.getRoot());
 
         // get the items
-        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, 
includeExcludeMatcher, warningCollector,
+        IApplicationContext appCtx = (IApplicationContext) 
ctx.getApplicationContext();
+        List<S3Object> filesOnly = S3Utils.listS3Objects(appCtx, 
configuration, includeExcludeMatcher, warningCollector,
                 externalDataPrefix, evaluator);
 
         // Distribute work load amongst the partitions
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index abed33afcb..7ddbab914e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.external.input.record.reader.aws.parquet;
 
-import static 
org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf;
+import static 
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
 import static org.apache.asterix.external.util.aws.s3.S3Utils.listS3Objects;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -73,7 +74,8 @@ public class AwsS3ParquetReaderFactory extends 
HDFSDataSourceFactory {
             configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, 
externalDataPrefix.getRoot());
 
             String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-            List<S3Object> filesOnly = listS3Objects(configuration, 
includeExcludeMatcher, warningCollector,
+            IApplicationContext appCtx = (IApplicationContext) 
serviceCtx.getApplicationContext();
+            List<S3Object> filesOnly = listS3Objects(appCtx, configuration, 
includeExcludeMatcher, warningCollector,
                     externalDataPrefix, evaluator);
             path = buildPathURIs(container, filesOnly);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index e758f64f88..1de2cd252c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -39,6 +39,7 @@ public class ExternalDataConstants {
     // used to specify the stream factory for an adapter that has a stream 
data source
     public static final String KEY_STREAM = "stream";
     //TODO(DB): check adapter configuration
+    public static final String KEY_DATASET = "dataset";
     public static final String KEY_DATASET_DATABASE = "dataset-database";
     // used to specify the dataverse of the adapter
     public static final String KEY_DATASET_DATAVERSE = "dataset-dataverse";
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 8c72a8c4c3..1d5f2ed0d9 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -74,8 +74,8 @@ import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna
 import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.external.util.azure.blob_storage.AzureConstants;
 import org.apache.asterix.external.util.google.gcs.GCSConstants;
 import org.apache.asterix.om.types.ARecordType;
@@ -675,7 +675,7 @@ public class ExternalDataUtils {
 
         switch (type) {
             case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
-                S3Utils.validateProperties(configuration, srcLoc, collector);
+                S3AuthUtils.validateProperties(appCtx, configuration, srcLoc, 
collector);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
                 validateAzureBlobProperties(configuration, srcLoc, collector, 
appCtx);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
similarity index 57%
copy from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
copy to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 3cfccb47e2..2ba1844a86 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -51,22 +51,18 @@ import static 
org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.function.BiPredicate;
-import java.util.regex.Matcher;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.external.IExternalFilterEvaluator;
-import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
-import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.mapred.JobConf;
@@ -87,22 +83,17 @@ import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.s3.model.CommonPrefix;
-import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.S3Exception;
-import software.amazon.awssdk.services.s3.model.S3Object;
 import software.amazon.awssdk.services.s3.model.S3Response;
-import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
 import software.amazon.awssdk.services.sts.StsClient;
 import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
 import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
 import software.amazon.awssdk.services.sts.model.Credentials;
 
-public class S3Utils {
-    private S3Utils() {
+public class S3AuthUtils {
+    private S3AuthUtils() {
         throw new AssertionError("do not instantiate");
     }
 
@@ -117,15 +108,17 @@ public class S3Utils {
      * @return S3 client
      * @throws CompilationException CompilationException
      */
-    public static S3Client buildAwsS3Client(Map<String, String> configuration) 
throws CompilationException {
+    public static S3Client buildAwsS3Client(IApplicationContext appCtx, 
Map<String, String> configuration)
+            throws CompilationException {
         String regionId = configuration.get(REGION_FIELD_NAME);
         String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
 
         Region region = validateAndGetRegion(regionId);
-        AwsCredentialsProvider credentialsProvider = 
buildCredentialsProvider(configuration);
+        AwsCredentialsProvider credentialsProvider = 
buildCredentialsProvider(appCtx, configuration);
 
         S3ClientBuilder builder = S3Client.builder();
         builder.region(region);
+        builder.crossRegionAccessEnabled(true);
         builder.credentialsProvider(credentialsProvider);
 
         // Validate the service endpoint if present
@@ -146,8 +139,8 @@ public class S3Utils {
         return builder.build();
     }
 
-    public static AwsCredentialsProvider buildCredentialsProvider(Map<String, 
String> configuration)
-            throws CompilationException {
+    public static AwsCredentialsProvider 
buildCredentialsProvider(IApplicationContext appCtx,
+            Map<String, String> configuration) throws CompilationException {
         String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
         String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
         String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
@@ -157,12 +150,11 @@ public class S3Utils {
         if (noAuth(configuration)) {
             return AnonymousCredentialsProvider.create();
         } else if (arnRole != null) {
-            // TODO: Do auth validation and use existing credentials if exist 
already, if not, assume the role
-            return validateAndGetTrustAccountAuthentication(configuration);
+            return getTrustAccountCredentials(appCtx, configuration);
         } else if (instanceProfile != null) {
-            return validateAndGetInstanceProfileAuthentication(configuration);
+            return getInstanceProfileCredentials(configuration);
         } else if (accessKeyId != null || secretAccessKey != null) {
-            return validateAndGetAccessKeysAuthentications(configuration);
+            return getAccessKeyCredentials(configuration);
         } else {
             if (externalId != null) {
                 throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
@@ -174,6 +166,166 @@ public class S3Utils {
         }
     }
 
+    public static Region validateAndGetRegion(String regionId) throws 
CompilationException {
+        List<Region> regions = S3Client.serviceMetadata().regions();
+        Optional<Region> selectedRegion = regions.stream().filter(region -> 
region.id().equals(regionId)).findFirst();
+
+        if (selectedRegion.isEmpty()) {
+            throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
+        }
+        return selectedRegion.get();
+    }
+
+    private static boolean noAuth(Map<String, String> configuration) {
+        return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, 
ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
+                ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, 
SESSION_TOKEN_FIELD_NAME) == null;
+    }
+
+    /**
+     * Returns the cached credentials if valid, otherwise, generates new 
credentials by assume a role
+     *
+     * @param appCtx application context
+     * @param configuration configuration
+     * @return returns the cached credentials if valid, otherwise, generates 
new credentials by assume a role
+     * @throws CompilationException CompilationException
+     */
+    public static AwsCredentialsProvider 
getTrustAccountCredentials(IApplicationContext appCtx,
+            Map<String, String> configuration) throws CompilationException {
+        IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
+        Object credentialsObject = cache.getCredentials(configuration);
+        if (credentialsObject != null) {
+            return () -> (AwsSessionCredentials) credentialsObject;
+        }
+        IExternalCredentialsCacheUpdater cacheUpdater = 
appCtx.getExternalCredentialsCacheUpdater();
+        AwsSessionCredentials credentials;
+        try {
+            credentials = (AwsSessionCredentials) 
cacheUpdater.generateAndCacheCredentials(configuration);
+        } catch (HyracksDataException ex) {
+            throw new 
CompilationException(ErrorCode.FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION, 
ex, ex.getMessage());
+        }
+
+        return () -> credentials;
+    }
+
+    /**
+     * Assume role using provided credentials and return the new credentials
+     *
+     * @param configuration configuration
+     * @return return credentials from the assume role
+     * @throws CompilationException CompilationException
+     */
+    public static AwsCredentialsProvider 
assumeRoleAndGetCredentials(Map<String, String> configuration)
+            throws CompilationException {
+        String regionId = configuration.get(REGION_FIELD_NAME);
+        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
+        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+        Region region = validateAndGetRegion(regionId);
+
+        AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
+        builder.roleArn(arnRole);
+        builder.roleSessionName(UUID.randomUUID().toString());
+        builder.durationSeconds(900); // TODO(htowaileb): configurable? Can be 
900 to 43200 (15 mins to 12 hours)
+        if (externalId != null) {
+            builder.externalId(externalId);
+        }
+
+        // credentials to be used to assume the role
+        AwsCredentialsProvider credentialsProvider;
+        AssumeRoleRequest request = builder.build();
+        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        if ("true".equalsIgnoreCase(instanceProfile)) {
+            credentialsProvider = getInstanceProfileCredentials(configuration, 
true);
+        } else if (accessKeyId != null && secretAccessKey != null) {
+            credentialsProvider = getAccessKeyCredentials(configuration, true);
+        } else {
+            throw new 
CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
+        }
+
+        // assume the role from the provided arn
+        try (StsClient stsClient =
+                
StsClient.builder().region(region).credentialsProvider(credentialsProvider).build())
 {
+            AssumeRoleResponse response = stsClient.assumeRole(request);
+            Credentials credentials = response.credentials();
+            return 
StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
+                    credentials.secretAccessKey(), 
credentials.sessionToken()));
+        } catch (SdkException ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
+        }
+    }
+
+    private static AwsCredentialsProvider 
getInstanceProfileCredentials(Map<String, String> configuration)
+            throws CompilationException {
+        return getInstanceProfileCredentials(configuration, false);
+    }
+
+    private static AwsCredentialsProvider 
getInstanceProfileCredentials(Map<String, String> configuration,
+            boolean assumeRoleAuthentication) throws CompilationException {
+        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+
+        // only "true" value is allowed
+        if (!"true".equalsIgnoreCase(instanceProfile)) {
+            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
INSTANCE_PROFILE_FIELD_NAME, "true");
+        }
+
+        if (!assumeRoleAuthentication) {
+            String notAllowed = getNonNull(configuration, 
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
+                    SESSION_TOKEN_FIELD_NAME);
+            if (notAllowed != null) {
+                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+                        INSTANCE_PROFILE_FIELD_NAME);
+            }
+        }
+        return InstanceProfileCredentialsProvider.create();
+    }
+
+    private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, 
String> configuration)
+            throws CompilationException {
+        return getAccessKeyCredentials(configuration, false);
+    }
+
+    private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, 
String> configuration,
+            boolean assumeRoleAuthentication) throws CompilationException {
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+
+        if (accessKeyId == null) {
+            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
+                    SECRET_ACCESS_KEY_FIELD_NAME);
+        }
+        if (secretAccessKey == null) {
+            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
SECRET_ACCESS_KEY_FIELD_NAME,
+                    ACCESS_KEY_ID_FIELD_NAME);
+        }
+
+        if (!assumeRoleAuthentication) {
+            String notAllowed = getNonNull(configuration, 
INSTANCE_PROFILE_FIELD_NAME, EXTERNAL_ID_FIELD_NAME);
+            if (notAllowed != null) {
+                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+                        INSTANCE_PROFILE_FIELD_NAME);
+            }
+        }
+
+        // use session token if provided
+        if (sessionToken != null) {
+            return StaticCredentialsProvider
+                    .create(AwsSessionCredentials.create(accessKeyId, 
secretAccessKey, sessionToken));
+        } else {
+            return 
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, 
secretAccessKey));
+        }
+    }
+
+    private static String getNonNull(Map<String, String> configuration, 
String... fieldNames) {
+        for (String fieldName : fieldNames) {
+            if (configuration.get(fieldName) != null) {
+                return fieldName;
+            }
+        }
+        return null;
+    }
+
     /**
      * Builds the S3 client using the provided configuration
      *
@@ -235,8 +387,8 @@ public class S3Utils {
      * @param configuration properties
      * @throws CompilationException Compilation exception
      */
-    public static void validateProperties(Map<String, String> configuration, 
SourceLocation srcLoc,
-            IWarningCollector collector) throws CompilationException {
+    public static void validateProperties(IApplicationContext appCtx, 
Map<String, String> configuration,
+            SourceLocation srcLoc, IWarningCollector collector) throws 
CompilationException {
         if (isDeltaTable(configuration)) {
             validateDeltaTableProperties(configuration);
         }
@@ -251,12 +403,7 @@ public class S3Utils {
         String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
 
         if (arnRole != null) {
-            String notAllowed = getNonNull(configuration, 
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                    SESSION_TOKEN_FIELD_NAME);
-            if (notAllowed != null) {
-                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                        INSTANCE_PROFILE_FIELD_NAME);
-            }
+            return;
         } else if (externalId != null) {
             throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ROLE_ARN_FIELD_NAME,
                     EXTERNAL_ID_FIELD_NAME);
@@ -280,21 +427,21 @@ public class S3Utils {
         }
 
         // Check if the bucket is present
-        S3Client s3Client = buildAwsS3Client(configuration);
+        S3Client s3Client = buildAwsS3Client(appCtx, configuration);
         S3Response response;
         boolean useOldApi = false;
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         String prefix = getPrefix(configuration);
 
         try {
-            response = isBucketEmpty(s3Client, container, prefix, false);
+            response = S3Utils.isBucketEmpty(s3Client, container, prefix, 
false);
         } catch (S3Exception ex) {
             // Method not implemented, try falling back to old API
             try {
                 // For error code, see 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
                 if 
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
                     useOldApi = true;
-                    response = isBucketEmpty(s3Client, container, prefix, 
true);
+                    response = S3Utils.isBucketEmpty(s3Client, container, 
prefix, true);
                 } else {
                     throw ex;
                 }
@@ -322,349 +469,4 @@ public class S3Utils {
             throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
         }
     }
-
-    /**
-     * Checks for a single object in the specified bucket to determine if the 
bucket is empty or not.
-     *
-     * @param s3Client  s3 client
-     * @param container the container name
-     * @param prefix    Prefix to be used
-     * @param useOldApi flag whether to use the old API or not
-     * @return returns the S3 response
-     */
-    private static S3Response isBucketEmpty(S3Client s3Client, String 
container, String prefix, boolean useOldApi) {
-        S3Response response;
-        if (useOldApi) {
-            ListObjectsRequest.Builder listObjectsBuilder = 
ListObjectsRequest.builder();
-            listObjectsBuilder.prefix(prefix);
-            response = 
s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build());
-        } else {
-            ListObjectsV2Request.Builder listObjectsBuilder = 
ListObjectsV2Request.builder();
-            listObjectsBuilder.prefix(prefix);
-            response = 
s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
-        }
-        return response;
-    }
-
-    /**
-     * Returns the lists of S3 objects.
-     *
-     * @param configuration         properties
-     * @param includeExcludeMatcher include/exclude matchers to apply
-     */
-    public static List<S3Object> listS3Objects(Map<String, String> 
configuration,
-            AbstractExternalInputStreamFactory.IncludeExcludeMatcher 
includeExcludeMatcher,
-            IWarningCollector warningCollector, ExternalDataPrefix 
externalDataPrefix,
-            IExternalFilterEvaluator evaluator) throws CompilationException, 
HyracksDataException {
-        // Prepare to retrieve the objects
-        List<S3Object> filesOnly;
-        String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        S3Client s3Client = buildAwsS3Client(configuration);
-        String prefix = getPrefix(configuration);
-
-        try {
-            filesOnly = listS3Objects(s3Client, container, prefix, 
includeExcludeMatcher, externalDataPrefix, evaluator,
-                    warningCollector);
-        } catch (S3Exception ex) {
-            // New API is not implemented, try falling back to old API
-            try {
-                // For error code, see 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                if 
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
-                    filesOnly = oldApiListS3Objects(s3Client, container, 
prefix, includeExcludeMatcher,
-                            externalDataPrefix, evaluator, warningCollector);
-                } else {
-                    throw ex;
-                }
-            } catch (SdkException ex2) {
-                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, 
getMessageOrToString(ex));
-            }
-        } catch (SdkException ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
-        } finally {
-            if (s3Client != null) {
-                CleanupUtils.close(s3Client, null);
-            }
-        }
-
-        // Warn if no files are returned
-        if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
-            Warning warning = Warning.of(null, 
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-            warningCollector.warn(warning);
-        }
-
-        return filesOnly;
-    }
-
-    /**
-     * Uses the latest API to retrieve the objects from the storage.
-     *
-     * @param s3Client              S3 client
-     * @param container             container name
-     * @param prefix                definition prefix
-     * @param includeExcludeMatcher include/exclude matchers to apply
-     */
-    private static List<S3Object> listS3Objects(S3Client s3Client, String 
container, String prefix,
-            AbstractExternalInputStreamFactory.IncludeExcludeMatcher 
includeExcludeMatcher,
-            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator 
evaluator,
-            IWarningCollector warningCollector) throws HyracksDataException {
-        String newMarker = null;
-        List<S3Object> filesOnly = new ArrayList<>();
-
-        ListObjectsV2Response listObjectsResponse;
-        ListObjectsV2Request.Builder listObjectsBuilder = 
ListObjectsV2Request.builder().bucket(container);
-        listObjectsBuilder.prefix(prefix);
-
-        while (true) {
-            // List the objects from the start, or from the last marker in 
case of truncated result
-            if (newMarker == null) {
-                listObjectsResponse = 
s3Client.listObjectsV2(listObjectsBuilder.build());
-            } else {
-                listObjectsResponse = 
s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
-            }
-
-            // Collect the paths to files only
-            collectAndFilterFiles(listObjectsResponse.contents(), 
includeExcludeMatcher.getPredicate(),
-                    includeExcludeMatcher.getMatchersList(), filesOnly, 
externalDataPrefix, evaluator,
-                    warningCollector);
-
-            // Mark the flag as done if done, otherwise, get the marker of the 
previous response for the next request
-            if (listObjectsResponse.isTruncated() != null && 
listObjectsResponse.isTruncated()) {
-                newMarker = listObjectsResponse.nextContinuationToken();
-            } else {
-                break;
-            }
-        }
-
-        return filesOnly;
-    }
-
-    /**
-     * Uses the old API (in case the new API is not implemented) to retrieve 
the objects from the storage
-     *
-     * @param s3Client              S3 client
-     * @param container             container name
-     * @param prefix                definition prefix
-     * @param includeExcludeMatcher include/exclude matchers to apply
-     */
-    private static List<S3Object> oldApiListS3Objects(S3Client s3Client, 
String container, String prefix,
-            AbstractExternalInputStreamFactory.IncludeExcludeMatcher 
includeExcludeMatcher,
-            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator 
evaluator,
-            IWarningCollector warningCollector) throws HyracksDataException {
-        String newMarker = null;
-        List<S3Object> filesOnly = new ArrayList<>();
-
-        ListObjectsResponse listObjectsResponse;
-        ListObjectsRequest.Builder listObjectsBuilder = 
ListObjectsRequest.builder().bucket(container);
-        listObjectsBuilder.prefix(prefix);
-
-        while (true) {
-            // List the objects from the start, or from the last marker in 
case of truncated result
-            if (newMarker == null) {
-                listObjectsResponse = 
s3Client.listObjects(listObjectsBuilder.build());
-            } else {
-                listObjectsResponse = 
s3Client.listObjects(listObjectsBuilder.marker(newMarker).build());
-            }
-
-            // Collect the paths to files only
-            collectAndFilterFiles(listObjectsResponse.contents(), 
includeExcludeMatcher.getPredicate(),
-                    includeExcludeMatcher.getMatchersList(), filesOnly, 
externalDataPrefix, evaluator,
-                    warningCollector);
-
-            // Mark the flag as done if done, otherwise, get the marker of the 
previous response for the next request
-            if (listObjectsResponse.isTruncated() != null && 
listObjectsResponse.isTruncated()) {
-                newMarker = listObjectsResponse.nextMarker();
-            } else {
-                break;
-            }
-        }
-
-        return filesOnly;
-    }
-
-    /**
-     * Collects only files that pass all tests
-     *
-     * @param s3Objects          s3 objects
-     * @param predicate          predicate
-     * @param matchers           matchers
-     * @param filesOnly          filtered files
-     * @param externalDataPrefix external data prefix
-     * @param evaluator          evaluator
-     */
-    private static void collectAndFilterFiles(List<S3Object> s3Objects, 
BiPredicate<List<Matcher>, String> predicate,
-            List<Matcher> matchers, List<S3Object> filesOnly, 
ExternalDataPrefix externalDataPrefix,
-            IExternalFilterEvaluator evaluator, IWarningCollector 
warningCollector) throws HyracksDataException {
-        for (S3Object object : s3Objects) {
-            if (ExternalDataUtils.evaluate(object.key(), predicate, matchers, 
externalDataPrefix, evaluator,
-                    warningCollector)) {
-                filesOnly.add(object);
-            }
-        }
-    }
-
-    public static Map<String, List<String>> S3ObjectsOfSingleDepth(Map<String, 
String> configuration, String container,
-            String prefix) throws CompilationException {
-        // create s3 client
-        S3Client s3Client = buildAwsS3Client(configuration);
-        // fetch all the s3 objects
-        return listS3ObjectsOfSingleDepth(s3Client, container, prefix);
-    }
-
-    /**
-     * Uses the latest API to retrieve the objects from the storage of a 
single level.
-     *
-     * @param s3Client              S3 client
-     * @param container             container name
-     * @param prefix                definition prefix
-     */
-    private static Map<String, List<String>> 
listS3ObjectsOfSingleDepth(S3Client s3Client, String container,
-            String prefix) {
-        Map<String, List<String>> allObjects = new HashMap<>();
-        ListObjectsV2Iterable listObjectsInterable;
-        ListObjectsV2Request.Builder listObjectsBuilder =
-                
ListObjectsV2Request.builder().bucket(container).prefix(prefix).delimiter("/");
-        listObjectsBuilder.prefix(prefix);
-        List<String> files = new ArrayList<>();
-        List<String> folders = new ArrayList<>();
-        // to skip the prefix as a file from the response
-        boolean checkPrefixInFile = true;
-        listObjectsInterable = 
s3Client.listObjectsV2Paginator(listObjectsBuilder.build());
-        for (ListObjectsV2Response response : listObjectsInterable) {
-            // put all the files
-            for (S3Object object : response.contents()) {
-                String fileName = object.key();
-                fileName = fileName.substring(prefix.length(), 
fileName.length());
-                if (checkPrefixInFile) {
-                    if (prefix.equals(object.key()))
-                        checkPrefixInFile = false;
-                    else {
-                        files.add(fileName);
-                    }
-                } else {
-                    files.add(fileName);
-                }
-            }
-            // put all the folders
-            for (CommonPrefix object : response.commonPrefixes()) {
-                String folderName = object.prefix();
-                folderName = folderName.substring(prefix.length(), 
folderName.length());
-                folders.add(folderName.endsWith("/") ? folderName.substring(0, 
folderName.length() - 1) : folderName);
-            }
-        }
-        allObjects.put("files", files);
-        allObjects.put("folders", folders);
-        return allObjects;
-    }
-
-    public static Region validateAndGetRegion(String regionId) throws 
CompilationException {
-        List<Region> regions = S3Client.serviceMetadata().regions();
-        Optional<Region> selectedRegion = regions.stream().filter(region -> 
region.id().equals(regionId)).findFirst();
-
-        if (selectedRegion.isEmpty()) {
-            throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
-        }
-        return selectedRegion.get();
-    }
-
-    // TODO(htowaileb): Currently, trust-account is always assuming we have 
instance profile setup in place
-    private static AwsCredentialsProvider 
validateAndGetTrustAccountAuthentication(Map<String, String> configuration)
-            throws CompilationException {
-        String notAllowed = getNonNull(configuration, 
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                SESSION_TOKEN_FIELD_NAME);
-        if (notAllowed != null) {
-            throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                    INSTANCE_PROFILE_FIELD_NAME);
-        }
-
-        String regionId = configuration.get(REGION_FIELD_NAME);
-        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
-        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
-        Region region = validateAndGetRegion(regionId);
-
-        AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
-        builder.roleArn(arnRole);
-        builder.roleSessionName(UUID.randomUUID().toString());
-        builder.durationSeconds(900); // minimum role assume duration = 900 
seconds (15 minutes), make configurable?
-        if (externalId != null) {
-            builder.externalId(externalId);
-        }
-        AssumeRoleRequest request = builder.build();
-        AwsCredentialsProvider credentialsProvider = 
validateAndGetInstanceProfileAuthentication(configuration);
-
-        // TODO(htowaileb): We shouldn't assume role with each request, rather 
stored the received temporary credentials
-        // and refresh when expired.
-        // assume the role from the provided arn
-        try (StsClient stsClient =
-                
StsClient.builder().region(region).credentialsProvider(credentialsProvider).build())
 {
-            AssumeRoleResponse response = stsClient.assumeRole(request);
-            Credentials credentials = response.credentials();
-            return 
StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
-                    credentials.secretAccessKey(), 
credentials.sessionToken()));
-        } catch (SdkException ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
-        }
-    }
-
-    private static AwsCredentialsProvider 
validateAndGetInstanceProfileAuthentication(Map<String, String> configuration)
-            throws CompilationException {
-        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-
-        // only "true" value is allowed
-        if (!"true".equalsIgnoreCase(instanceProfile)) {
-            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
INSTANCE_PROFILE_FIELD_NAME, "true");
-        }
-
-        String notAllowed = getNonNull(configuration, 
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                SESSION_TOKEN_FIELD_NAME);
-        if (notAllowed != null) {
-            throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                    INSTANCE_PROFILE_FIELD_NAME);
-        }
-        return InstanceProfileCredentialsProvider.create();
-    }
-
-    private static AwsCredentialsProvider 
validateAndGetAccessKeysAuthentications(Map<String, String> configuration)
-            throws CompilationException {
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
-
-        // accessKeyId authentication
-        if (accessKeyId == null) {
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
-                    SECRET_ACCESS_KEY_FIELD_NAME);
-        }
-        if (secretAccessKey == null) {
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
SECRET_ACCESS_KEY_FIELD_NAME,
-                    ACCESS_KEY_ID_FIELD_NAME);
-        }
-
-        String notAllowed = getNonNull(configuration, EXTERNAL_ID_FIELD_NAME);
-        if (notAllowed != null) {
-            throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                    INSTANCE_PROFILE_FIELD_NAME);
-        }
-
-        // use session token if provided
-        if (sessionToken != null) {
-            return StaticCredentialsProvider
-                    .create(AwsSessionCredentials.create(accessKeyId, 
secretAccessKey, sessionToken));
-        } else {
-            return 
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, 
secretAccessKey));
-        }
-    }
-
-    private static boolean noAuth(Map<String, String> configuration) {
-        return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, 
ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
-                ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, 
SESSION_TOKEN_FIELD_NAME) == null;
-    }
-
-    private static String getNonNull(Map<String, String> configuration, 
String... fieldNames) {
-        for (String fieldName : fieldNames) {
-            if (configuration.get(fieldName) != null) {
-                return fieldName;
-            }
-        }
-        return null;
-    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 3cfccb47e2..a2b50e1fc0 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -18,48 +18,18 @@
  */
 package org.apache.asterix.external.util.aws.s3;
 
-import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
-import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
-import static 
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
-import static 
org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
-import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
-import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
-import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.EXTERNAL_ID_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS_ACCESS;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_PROTOCOL;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMP_ACCESS;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.INSTANCE_PROFILE_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.REGION_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ROLE_ARN_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SESSION_TOKEN_FIELD_NAME;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -67,26 +37,13 @@ import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.api.util.CleanupUtils;
 
-import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import 
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.model.CommonPrefix;
 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
@@ -96,233 +53,12 @@ import 
software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.S3Object;
 import software.amazon.awssdk.services.s3.model.S3Response;
 import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
-import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
-import software.amazon.awssdk.services.sts.model.Credentials;
 
 public class S3Utils {
     private S3Utils() {
         throw new AssertionError("do not instantiate");
     }
 
-    public static boolean isRetryableError(String errorCode) {
-        return errorCode.equals(ERROR_INTERNAL_ERROR) || 
errorCode.equals(ERROR_SLOW_DOWN);
-    }
-
-    /**
-     * Builds the S3 client using the provided configuration
-     *
-     * @param configuration properties
-     * @return S3 client
-     * @throws CompilationException CompilationException
-     */
-    public static S3Client buildAwsS3Client(Map<String, String> configuration) 
throws CompilationException {
-        String regionId = configuration.get(REGION_FIELD_NAME);
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
-        Region region = validateAndGetRegion(regionId);
-        AwsCredentialsProvider credentialsProvider = 
buildCredentialsProvider(configuration);
-
-        S3ClientBuilder builder = S3Client.builder();
-        builder.region(region);
-        builder.credentialsProvider(credentialsProvider);
-
-        // Validate the service endpoint if present
-        if (serviceEndpoint != null) {
-            try {
-                URI uri = new URI(serviceEndpoint);
-                try {
-                    builder.endpointOverride(uri);
-                } catch (NullPointerException ex) {
-                    throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
-                }
-            } catch (URISyntaxException ex) {
-                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
-                        String.format("Invalid service endpoint %s", 
serviceEndpoint));
-            }
-        }
-
-        return builder.build();
-    }
-
-    public static AwsCredentialsProvider buildCredentialsProvider(Map<String, 
String> configuration)
-            throws CompilationException {
-        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
-        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
-        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
-        if (noAuth(configuration)) {
-            return AnonymousCredentialsProvider.create();
-        } else if (arnRole != null) {
-            // TODO: Do auth validation and use existing credentials if exist 
already, if not, assume the role
-            return validateAndGetTrustAccountAuthentication(configuration);
-        } else if (instanceProfile != null) {
-            return validateAndGetInstanceProfileAuthentication(configuration);
-        } else if (accessKeyId != null || secretAccessKey != null) {
-            return validateAndGetAccessKeysAuthentications(configuration);
-        } else {
-            if (externalId != null) {
-                throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
-                        EXTERNAL_ID_FIELD_NAME);
-            } else {
-                throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
-                        SESSION_TOKEN_FIELD_NAME);
-            }
-        }
-    }
-
-    /**
-     * Builds the S3 client using the provided configuration
-     *
-     * @param configuration      properties
-     * @param numberOfPartitions number of partitions in the cluster
-     */
-    public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, 
String> configuration,
-            int numberOfPartitions) {
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
-        //Disable caching S3 FileSystem
-        HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
-
-        /*
-         * Authentication Methods:
-         * 1- Anonymous: no accessKeyId and no secretAccessKey
-         * 2- Temporary: has to provide accessKeyId, secretAccessKey and 
sessionToken
-         * 3- Private: has to provide accessKeyId and secretAccessKey
-         */
-        if (accessKeyId == null) {
-            //Tells hadoop-aws it is an anonymous access
-            conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
-        } else {
-            conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
-            conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
-            if (sessionToken != null) {
-                conf.set(HADOOP_SESSION_TOKEN, sessionToken);
-                //Tells hadoop-aws it is a temporary access
-                conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
-            }
-        }
-
-        /*
-         * This is to allow S3 definition to have path-style form. Should 
always be true to match the current
-         * way we access files in S3
-         */
-        conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
-
-        /*
-         * Set the size of S3 connection pool to be the number of partitions
-         */
-        conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, 
String.valueOf(numberOfPartitions));
-
-        if (serviceEndpoint != null) {
-            // Validation of the URL should be done at hadoop-aws level
-            conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        } else {
-            //Region is ignored and buckets could be found by the central 
endpoint
-            conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
-        }
-    }
-
-    /**
-     * Validate external dataset properties
-     *
-     * @param configuration properties
-     * @throws CompilationException Compilation exception
-     */
-    public static void validateProperties(Map<String, String> configuration, 
SourceLocation srcLoc,
-            IWarningCollector collector) throws CompilationException {
-        if (isDeltaTable(configuration)) {
-            validateDeltaTableProperties(configuration);
-        }
-        // check if the format property is present
-        else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
-            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
-        }
-
-        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
-        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
-        if (arnRole != null) {
-            String notAllowed = getNonNull(configuration, 
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                    SESSION_TOKEN_FIELD_NAME);
-            if (notAllowed != null) {
-                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                        INSTANCE_PROFILE_FIELD_NAME);
-            }
-        } else if (externalId != null) {
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ROLE_ARN_FIELD_NAME,
-                    EXTERNAL_ID_FIELD_NAME);
-        } else if (accessKeyId == null || secretAccessKey == null) {
-            // If one is passed, the other is required
-            if (accessKeyId != null) {
-                throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
SECRET_ACCESS_KEY_FIELD_NAME,
-                        ACCESS_KEY_ID_FIELD_NAME);
-            } else if (secretAccessKey != null) {
-                throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
-                        SECRET_ACCESS_KEY_FIELD_NAME);
-            }
-        }
-
-        validateIncludeExclude(configuration);
-        try {
-            // TODO(htowaileb): maybe something better, this will check to 
ensure type is supported before creation
-            new ExternalDataPrefix(configuration);
-        } catch (AlgebricksException ex) {
-            throw new 
CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
-        }
-
-        // Check if the bucket is present
-        S3Client s3Client = buildAwsS3Client(configuration);
-        S3Response response;
-        boolean useOldApi = false;
-        String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        String prefix = getPrefix(configuration);
-
-        try {
-            response = isBucketEmpty(s3Client, container, prefix, false);
-        } catch (S3Exception ex) {
-            // Method not implemented, try falling back to old API
-            try {
-                // For error code, see 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                if 
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
-                    useOldApi = true;
-                    response = isBucketEmpty(s3Client, container, prefix, 
true);
-                } else {
-                    throw ex;
-                }
-            } catch (SdkException ex2) {
-                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, 
getMessageOrToString(ex));
-            }
-        } catch (SdkException ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
-        } finally {
-            if (s3Client != null) {
-                CleanupUtils.close(s3Client, null);
-            }
-        }
-
-        boolean isEmpty = useOldApi ? ((ListObjectsResponse) 
response).contents().isEmpty()
-                : ((ListObjectsV2Response) response).contents().isEmpty();
-        if (isEmpty && collector.shouldWarn()) {
-            Warning warning = Warning.of(srcLoc, 
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-            collector.warn(warning);
-        }
-
-        // Returns 200 only in case the bucket exists, otherwise, throws an 
exception. However, to
-        // ensure coverage, check if the result is successful as well and not 
only catch exceptions
-        if (!response.sdkHttpResponse().isSuccessful()) {
-            throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
-        }
-    }
-
     /**
      * Checks for a single object in the specified bucket to determine if the 
bucket is empty or not.
      *
@@ -332,7 +68,7 @@ public class S3Utils {
      * @param useOldApi flag whether to use the old API or not
      * @return returns the S3 response
      */
-    private static S3Response isBucketEmpty(S3Client s3Client, String 
container, String prefix, boolean useOldApi) {
+    protected static S3Response isBucketEmpty(S3Client s3Client, String 
container, String prefix, boolean useOldApi) {
         S3Response response;
         if (useOldApi) {
             ListObjectsRequest.Builder listObjectsBuilder = 
ListObjectsRequest.builder();
@@ -352,14 +88,14 @@ public class S3Utils {
      * @param configuration         properties
      * @param includeExcludeMatcher include/exclude matchers to apply
      */
-    public static List<S3Object> listS3Objects(Map<String, String> 
configuration,
+    public static List<S3Object> listS3Objects(IApplicationContext appCtx, 
Map<String, String> configuration,
             AbstractExternalInputStreamFactory.IncludeExcludeMatcher 
includeExcludeMatcher,
             IWarningCollector warningCollector, ExternalDataPrefix 
externalDataPrefix,
             IExternalFilterEvaluator evaluator) throws CompilationException, 
HyracksDataException {
         // Prepare to retrieve the objects
         List<S3Object> filesOnly;
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        S3Client s3Client = buildAwsS3Client(configuration);
+        S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx, 
configuration);
         String prefix = getPrefix(configuration);
 
         try {
@@ -502,10 +238,11 @@ public class S3Utils {
         }
     }
 
-    public static Map<String, List<String>> S3ObjectsOfSingleDepth(Map<String, 
String> configuration, String container,
-            String prefix) throws CompilationException {
+    public static Map<String, List<String>> 
S3ObjectsOfSingleDepth(IApplicationContext appCtx,
+            Map<String, String> configuration, String container, String prefix)
+            throws CompilationException, HyracksDataException {
         // create s3 client
-        S3Client s3Client = buildAwsS3Client(configuration);
+        S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx, 
configuration);
         // fetch all the s3 objects
         return listS3ObjectsOfSingleDepth(s3Client, container, prefix);
     }
@@ -555,116 +292,4 @@ public class S3Utils {
         allObjects.put("folders", folders);
         return allObjects;
     }
-
-    public static Region validateAndGetRegion(String regionId) throws 
CompilationException {
-        List<Region> regions = S3Client.serviceMetadata().regions();
-        Optional<Region> selectedRegion = regions.stream().filter(region -> 
region.id().equals(regionId)).findFirst();
-
-        if (selectedRegion.isEmpty()) {
-            throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
-        }
-        return selectedRegion.get();
-    }
-
-    // TODO(htowaileb): Currently, trust-account is always assuming we have 
instance profile setup in place
-    private static AwsCredentialsProvider 
validateAndGetTrustAccountAuthentication(Map<String, String> configuration)
-            throws CompilationException {
-        String notAllowed = getNonNull(configuration, 
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                SESSION_TOKEN_FIELD_NAME);
-        if (notAllowed != null) {
-            throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                    INSTANCE_PROFILE_FIELD_NAME);
-        }
-
-        String regionId = configuration.get(REGION_FIELD_NAME);
-        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
-        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
-        Region region = validateAndGetRegion(regionId);
-
-        AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
-        builder.roleArn(arnRole);
-        builder.roleSessionName(UUID.randomUUID().toString());
-        builder.durationSeconds(900); // minimum role assume duration = 900 
seconds (15 minutes), make configurable?
-        if (externalId != null) {
-            builder.externalId(externalId);
-        }
-        AssumeRoleRequest request = builder.build();
-        AwsCredentialsProvider credentialsProvider = 
validateAndGetInstanceProfileAuthentication(configuration);
-
-        // TODO(htowaileb): We shouldn't assume role with each request, rather 
stored the received temporary credentials
-        // and refresh when expired.
-        // assume the role from the provided arn
-        try (StsClient stsClient =
-                
StsClient.builder().region(region).credentialsProvider(credentialsProvider).build())
 {
-            AssumeRoleResponse response = stsClient.assumeRole(request);
-            Credentials credentials = response.credentials();
-            return 
StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
-                    credentials.secretAccessKey(), 
credentials.sessionToken()));
-        } catch (SdkException ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
-        }
-    }
-
-    private static AwsCredentialsProvider 
validateAndGetInstanceProfileAuthentication(Map<String, String> configuration)
-            throws CompilationException {
-        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-
-        // only "true" value is allowed
-        if (!"true".equalsIgnoreCase(instanceProfile)) {
-            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
INSTANCE_PROFILE_FIELD_NAME, "true");
-        }
-
-        String notAllowed = getNonNull(configuration, 
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                SESSION_TOKEN_FIELD_NAME);
-        if (notAllowed != null) {
-            throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                    INSTANCE_PROFILE_FIELD_NAME);
-        }
-        return InstanceProfileCredentialsProvider.create();
-    }
-
-    private static AwsCredentialsProvider 
validateAndGetAccessKeysAuthentications(Map<String, String> configuration)
-            throws CompilationException {
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
-
-        // accessKeyId authentication
-        if (accessKeyId == null) {
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
-                    SECRET_ACCESS_KEY_FIELD_NAME);
-        }
-        if (secretAccessKey == null) {
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
SECRET_ACCESS_KEY_FIELD_NAME,
-                    ACCESS_KEY_ID_FIELD_NAME);
-        }
-
-        String notAllowed = getNonNull(configuration, EXTERNAL_ID_FIELD_NAME);
-        if (notAllowed != null) {
-            throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                    INSTANCE_PROFILE_FIELD_NAME);
-        }
-
-        // use session token if provided
-        if (sessionToken != null) {
-            return StaticCredentialsProvider
-                    .create(AwsSessionCredentials.create(accessKeyId, 
secretAccessKey, sessionToken));
-        } else {
-            return 
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, 
secretAccessKey));
-        }
-    }
-
-    private static boolean noAuth(Map<String, String> configuration) {
-        return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, 
ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
-                ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, 
SESSION_TOKEN_FIELD_NAME) == null;
-    }
-
-    private static String getNonNull(Map<String, String> configuration, 
String... fieldNames) {
-        for (String fieldName : fieldNames) {
-            if (configuration.get(fieldName) != null) {
-                return fieldName;
-            }
-        }
-        return null;
-    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
index dc20a89420..a4113d125d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -26,6 +26,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -123,7 +124,7 @@ public class HDFSExternalFileWriterFactory implements 
IExternalFileWriterFactory
     }
 
     @Override
-    public void validate() throws AlgebricksException {
+    public void validate(IApplicationContext appCtx) throws 
AlgebricksException {
         Configuration conf = HDFSUtils.configureHDFSwrite(configuration);
         credentials = HDFSUtils.configureHadoopAuthentication(configuration, 
conf);
         try {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
index bdefaa85b2..b1d3a956d8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -20,6 +20,7 @@ package org.apache.asterix.external.writer;
 
 import java.io.File;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -80,7 +81,7 @@ public final class LocalFSExternalFileWriterFactory 
implements IExternalFileWrit
     }
 
     @Override
-    public void validate() throws AlgebricksException {
+    public void validate(IApplicationContext appCtx) throws 
AlgebricksException {
         // A special case validation for a single node cluster
         if (singleNodeCluster && staticPath != null) {
             if (isNonEmptyDirectory(new File(staticPath))) {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index c1528532e9..ef056c958e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -994,6 +994,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             Map<String, String> configuration, ARecordType itemType, 
IWarningCollector warningCollector,
             IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws 
AlgebricksException {
         try {
+            configuration.put(ExternalDataConstants.KEY_DATASET, 
dataset.getDatasetName());
             configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE, 
dataset.getDatabaseName());
             configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
                     dataset.getDataverseName().getCanonicalForm());
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 1caea58ce7..e6716dff29 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -148,7 +148,7 @@ public class ExternalWriterProvider {
         String staticPath = staticPathExpr != null ? 
ConstantExpressionUtil.getStringConstant(staticPathExpr) : null;
         IExternalFileWriterFactory fileWriterFactory =
                 ExternalWriterProvider.createWriterFactory(appCtx, sink, 
staticPath, pathSourceLocation);
-        fileWriterFactory.validate();
+        fileWriterFactory.validate(appCtx);
         String fileExtension = ExternalWriterProvider.getFileExtension(sink);
         int maxResult = ExternalWriterProvider.getMaxResult(sink);
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
index 4a75db6906..2aeca4fe9e 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
@@ -18,11 +18,12 @@
  */
 package org.apache.asterix.runtime.writer;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
 public interface IExternalWriterFactoryValidator {
     /**
      * Perform the necessary validation to ensure the writer has the proper 
permissions
      */
-    void validate() throws AlgebricksException;
+    void validate(IApplicationContext appCtx) throws AlgebricksException;
 }

Reply via email to