This is an automated email from the ASF dual-hosted git repository.

htowaileb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 100f25ef7b [ASTERIXDB-3514][EXT]: Cleanup on S3 cross-account auth
100f25ef7b is described below

commit 100f25ef7b6f71d8c4ff0e3362b0948ca6e765a7
Author: Hussain Towaileb <[email protected]>
AuthorDate: Wed Feb 5 14:33:26 2025 +0300

    [ASTERIXDB-3514][EXT]: Cleanup on S3 cross-account auth
    
    Ext-ref: MB-63505
    Change-Id: Iba5dfd5e077f87aa0c1ee1cc81fcf197c9f06762
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19386
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Hussain Towaileb <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../app/external/ExternalCredentialsCache.java     | 47 ++++++------
 .../external/ExternalCredentialsCacheUpdater.java  | 87 ++++++++++++----------
 .../message/UpdateAwsCredentialsCacheRequest.java  | 21 ++++--
 .../asterix/app/translator/QueryTranslator.java    | 49 +++---------
 .../common/external/IExternalCredentialsCache.java |  5 +-
 .../metadata/declared/MetadataProvider.java        | 19 ++---
 6 files changed, 104 insertions(+), 124 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
index ed26f7c9e3..fcdbabda44 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -18,26 +18,20 @@
  */
 package org.apache.asterix.app.external;
 
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.external.IExternalCredentialsCache;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.util.Span;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-
 public class ExternalCredentialsCache implements IExternalCredentialsCache {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private final ConcurrentMap<String, Pair<Span, Object>> cache = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<String, TemporaryCredentials> cache = new 
ConcurrentHashMap<>();
     private final int awsAssumeRoleDuration;
     private final int refreshAwsAssumeRoleThresholdPercentage;
 
@@ -51,7 +45,7 @@ public class ExternalCredentialsCache implements 
IExternalCredentialsCache {
     public synchronized Object get(String key) {
         invalidateCache();
         if (cache.containsKey(key)) {
-            return cache.get(key).getRight();
+            return cache.get(key).getCredentials();
         }
         return null;
     }
@@ -65,20 +59,9 @@ public class ExternalCredentialsCache implements 
IExternalCredentialsCache {
     }
 
     @Override
-    public synchronized void put(String key, String type, Map<String, String> 
credentials) {
-        if 
(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
-            updateAwsCache(key, credentials);
-        }
-    }
-
-    private void updateAwsCache(String name, Map<String, String> credentials) {
-        String accessKeyId = 
credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
-        String sessionToken = 
credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
-
-        AwsSessionCredentials sessionCreds = 
AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
-        cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration, 
TimeUnit.SECONDS), sessionCreds));
-        LOGGER.info("Received and cached new credentials for {}", name);
+    public synchronized void put(String key, Object credentials) {
+        cache.put(key, new 
TemporaryCredentials(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), 
credentials));
+        LOGGER.info("Received and cached new credentials for {}", key);
     }
 
     /**
@@ -86,7 +69,7 @@ public class ExternalCredentialsCache implements 
IExternalCredentialsCache {
      */
     private void invalidateCache() {
         cache.entrySet().removeIf(entry -> {
-            boolean shouldRemove = needsRefresh(entry.getValue().getLeft());
+            boolean shouldRemove = 
needsRefresh(entry.getValue().getDuration());
             if (shouldRemove) {
                 LOGGER.info("Removing cached credentials for {} because it 
expired", entry.getKey());
             }
@@ -106,4 +89,22 @@ public class ExternalCredentialsCache implements 
IExternalCredentialsCache {
         int passedPercentage = (int) (passed * 100);
         return passedPercentage > refreshAwsAssumeRoleThresholdPercentage;
     }
+
+    static class TemporaryCredentials {
+        private final Span duration;
+        private final Object credentials;
+
+        public TemporaryCredentials(Span duration, Object credentials) {
+            this.duration = duration;
+            this.credentials = credentials;
+        }
+
+        public Span getDuration() {
+            return duration;
+        }
+
+        public Object getCredentials() {
+            return credentials;
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
index 29e3239517..4c382d0d03 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -24,7 +24,6 @@ import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.
 import static 
org.apache.asterix.common.exceptions.ErrorCode.REJECT_BAD_CLUSTER_STATE;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -39,10 +38,10 @@ import 
org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.external.IExternalCredentialsCache;
 import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.messaging.NCMessageBroker;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -53,6 +52,11 @@ import org.apache.logging.log4j.Logger;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 
+/**
+ * The class is responsible for generating new credentials based on the 
adapter type. Given a request:
+ * - if we are the CC, generate new creds and ask all NCs to update their cache
+ * - if we are the NC, send a message to the CC to generate new creds
+ */
 public class ExternalCredentialsCacheUpdater implements 
IExternalCredentialsCacheUpdater {
 
     private static final Logger LOGGER = LogManager.getLogger();
@@ -66,49 +70,46 @@ public class ExternalCredentialsCacheUpdater implements 
IExternalCredentialsCach
     public synchronized Object generateAndCacheCredentials(Map<String, String> 
configuration)
             throws HyracksDataException, CompilationException {
         IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
-        String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
-        Object credentials = cache.get(name);
+        String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
+        Object credentials = cache.get(key);
         if (credentials != null) {
             return credentials;
         }
 
-        /*
-         * if we are the CC, generate new creds and ask all NCs to update 
their cache
-         * if we are the NC, send a message to the CC to generate new creds 
and ask all NCs to update their cache
-         */
-        if (appCtx instanceof ICcApplicationContext ccAppCtx) {
-            IClusterManagementWork.ClusterState state = 
ccAppCtx.getClusterStateManager().getState();
-            if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
-                throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, 
state);
-            }
+        String type = 
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
+        if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(type)) {
+            credentials = generateAwsCredentials(configuration);
+        }
+
+        return credentials;
+    }
 
-            String accessKeyId;
-            String secretAccessKey;
-            String sessionToken;
-            Map<String, String> credentialsMap = new HashMap<>();
+    // TODO: this can probably be refactored out into something that is 
AWS-specific
+    private Object generateAwsCredentials(Map<String, String> configuration)
+            throws HyracksDataException, CompilationException {
+        String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
+        AwsSessionCredentials credentials;
+        if (appCtx instanceof ICcApplicationContext) {
+            validateClusterState();
             try {
-                LOGGER.info("attempting to update credentials for {}", name);
+                LOGGER.info("attempting to update AWS credentials for {}", 
key);
                 AwsCredentialsProvider newCredentials = 
S3AuthUtils.assumeRoleAndGetCredentials(configuration);
-                LOGGER.info("updated credentials successfully for {}", name);
-                AwsSessionCredentials sessionCredentials = 
(AwsSessionCredentials) newCredentials.resolveCredentials();
-                accessKeyId = sessionCredentials.accessKeyId();
-                secretAccessKey = sessionCredentials.secretAccessKey();
-                sessionToken = sessionCredentials.sessionToken();
+                LOGGER.info("updated AWS credentials successfully for {}", 
key);
+                credentials = (AwsSessionCredentials) 
newCredentials.resolveCredentials();
+                appCtx.getExternalCredentialsCache().put(key, credentials);
             } catch (CompilationException ex) {
-                LOGGER.info("failed to refresh credentials for {}", name, ex);
+                LOGGER.info("failed to refresh AWS credentials for {}", key, 
ex);
                 throw ex;
             }
 
-            // credentials need refreshing
-            credentialsMap.put(S3Constants.ACCESS_KEY_ID_FIELD_NAME, 
accessKeyId);
-            credentialsMap.put(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME, 
secretAccessKey);
-            credentialsMap.put(S3Constants.SESSION_TOKEN_FIELD_NAME, 
sessionToken);
+            String accessKeyId = credentials.accessKeyId();
+            String secretAccessKey = credentials.secretAccessKey();
+            String sessionToken = credentials.sessionToken();
+            UpdateAwsCredentialsCacheRequest request =
+                    new UpdateAwsCredentialsCacheRequest(configuration, 
accessKeyId, secretAccessKey, sessionToken);
 
             // request all NCs to update their credentials cache with the 
latest creds
-            updateNcsCredentialsCache(ccAppCtx, name, credentialsMap, 
configuration);
-            String type = 
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
-            cache.put(name, type, credentialsMap);
-            credentials = AwsSessionCredentials.create(accessKeyId, 
secretAccessKey, sessionToken);
+            updateNcsCredentialsCache(key, request);
         } else {
             NCMessageBroker broker = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
             MessageFuture messageFuture = broker.registerMessageFuture();
@@ -116,7 +117,7 @@ public class ExternalCredentialsCacheUpdater implements 
IExternalCredentialsCach
             long futureId = messageFuture.getFutureId();
             RefreshAwsCredentialsRequest request = new 
RefreshAwsCredentialsRequest(nodeId, futureId, configuration);
             try {
-                LOGGER.info("no valid credentials found for {}, requesting 
credentials from CC", name);
+                LOGGER.info("no valid AWS credentials found for {}, requesting 
AWS credentials from CC", key);
                 broker.sendMessageToPrimaryCC(request);
                 RefreshAwsCredentialsResponse response = 
(RefreshAwsCredentialsResponse) messageFuture
                         .get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
@@ -126,7 +127,7 @@ public class ExternalCredentialsCacheUpdater implements 
IExternalCredentialsCach
                 credentials = 
AwsSessionCredentials.create(response.getAccessKeyId(), 
response.getSecretAccessKey(),
                         response.getSessionToken());
             } catch (Exception ex) {
-                LOGGER.info("failed to refresh credentials for {}", name, ex);
+                LOGGER.info("failed to refresh AWS credentials for {}", key, 
ex);
                 throw HyracksDataException.create(ex);
             } finally {
                 broker.deregisterMessageFuture(futureId);
@@ -135,14 +136,12 @@ public class ExternalCredentialsCacheUpdater implements 
IExternalCredentialsCach
         return credentials;
     }
 
-    private void updateNcsCredentialsCache(ICcApplicationContext appCtx, 
String name, Map<String, String> credentials,
-            Map<String, String> configuration) throws HyracksDataException {
-        final List<String> ncs = new 
ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
+    private void updateNcsCredentialsCache(String key, INcAddressedMessage 
request) throws HyracksDataException {
+        ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+        final List<String> ncs = new 
ArrayList<>(ccAppCtx.getClusterStateManager().getParticipantNodes());
         CCMessageBroker broker = (CCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
-        UpdateAwsCredentialsCacheRequest request = new 
UpdateAwsCredentialsCacheRequest(configuration, credentials);
-
         try {
-            LOGGER.info("requesting all NCs to update their credentials for 
{}", name);
+            LOGGER.info("requesting all NCs to update their credentials for 
{}", key);
             for (String nc : ncs) {
                 broker.sendApplicationMessageToNC(request, nc);
             }
@@ -151,4 +150,12 @@ public class ExternalCredentialsCacheUpdater implements 
IExternalCredentialsCach
             throw HyracksDataException.create(e);
         }
     }
+
+    private void validateClusterState() throws HyracksDataException {
+        ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+        IClusterManagementWork.ClusterState state = 
ccAppCtx.getClusterStateManager().getState();
+        if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
+            throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state);
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
index 438e425f23..753dbf11e5 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
@@ -23,26 +23,31 @@ import java.util.Map;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 
 public class UpdateAwsCredentialsCacheRequest implements INcAddressedMessage {
 
-    private static final Logger LOGGER = LogManager.getLogger();
     private static final long serialVersionUID = 1L;
     private final Map<String, String> configuration;
-    private final Map<String, String> credentials;
+    private final String accessKeyId;
+    private final String secretAccessKey;
+    private final String sessionToken;
 
-    public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, 
Map<String, String> credentials) {
+    public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, 
String accessKeyId,
+            String secretAccessKey, String sessionToken) {
         this.configuration = configuration;
-        this.credentials = credentials;
+        this.accessKeyId = accessKeyId;
+        this.secretAccessKey = secretAccessKey;
+        this.sessionToken = sessionToken;
+
     }
 
     @Override
     public void handle(INcApplicationContext appCtx) {
         String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
-        String type = 
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
-        appCtx.getExternalCredentialsCache().put(name, type, credentials);
+        AwsSessionCredentials credentials = 
AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
+        appCtx.getExternalCredentialsCache().put(name, credentials);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 97bc3cdba0..8ba9a0279e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -579,17 +579,11 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             // async queries are completed after their job completes
             if (ResultDelivery.ASYNC != resultDelivery) {
                 
appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid());
-                postRequestCompleteCleanup(requestParameters);
             }
             Thread.currentThread().setName(threadName);
         }
     }
 
-    protected void postRequestCompleteCleanup(IRequestParameters 
requestParameters) {
-        String uuid = requestParameters.getRequestReference().getUuid();
-        appCtx.getExternalCredentialsCache().delete(uuid);
-    }
-
     protected void configureMetadataProvider(MetadataProvider 
metadataProvider, Map<String, String> config,
             Counter resultSetIdCounter, FileSplit outputFile, 
IRequestParameters requestParameters,
             Statement statement) {
@@ -1038,12 +1032,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     ExternalDataUtils.normalize(properties);
                     ExternalDataUtils.validate(properties);
                     ExternalDataUtils.validateType(properties, (ARecordType) 
itemType);
-                    Map<String, String> propertiesCopy = 
preparePropertiesCopyForValidation(externalDetails, properties,
-                            dd.getSourceLocation(), mdTxnCtx, appCtx, 
metadataProvider);
-                    // do any necessary validation on the copy to avoid 
changing the original and storing it in the metadata
-                    
metadataProvider.setExternalEntityIdFromParts(propertiesCopy, databaseName, 
dataverseName,
-                            datasetName, false);
-                    validateAdapterSpecificProperties(propertiesCopy, 
dd.getSourceLocation(), appCtx);
+                    validateExternalDatasetProperties(externalDetails, 
properties, dd.getSourceLocation(), mdTxnCtx,
+                            appCtx, metadataProvider);
                     datasetDetails = new 
ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
                             TransactionState.COMMIT);
                     break;
@@ -2459,7 +2449,6 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     sourceLoc, EnumSet.of(DropOption.IF_EXISTS), 
requestParameters.isForceDropDataset());
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
-            deleteDatasetCachedCredentials(ds);
             return true;
         } catch (Exception e) {
             LOGGER.error("failed to drop dataset; executing compensating 
operations", e);
@@ -2507,10 +2496,6 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void deleteDatasetCachedCredentials(Dataset dataset) throws 
CompilationException {
-        
appCtx.getExternalCredentialsCache().delete(dataset.getDatasetFullyQualifiedName().toString());
-    }
-
     protected void handleIndexDropStatement(MetadataProvider metadataProvider, 
Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters 
requestParameters) throws Exception {
         IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
@@ -4057,11 +4042,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     copyStmt, itemType, mdTxnCtx, metadataProvider);
             ExternalDataUtils.normalize(properties);
             ExternalDataUtils.validate(properties);
-            Map<String, String> propertiesCopy = 
preparePropertiesCopyForValidation(externalDetails, properties,
-                    copyStmt.getSourceLocation(), mdTxnCtx, appCtx, 
metadataProvider);
-            // do any necessary validation on the copy to avoid changing the 
original and storing it in the metadata
-            metadataProvider.setExternalEntityId(propertiesCopy, dataset);
-            validateAdapterSpecificProperties(propertiesCopy, 
copyStmt.getSourceLocation(), appCtx);
+            validateExternalDatasetProperties(externalDetails, properties, 
copyStmt.getSourceLocation(), mdTxnCtx,
+                    appCtx, metadataProvider);
             CompiledCopyFromFileStatement cls = new 
CompiledCopyFromFileStatement(databaseName, dataverseName,
                     copyStmt.getDatasetName(), itemType, 
externalDetails.getAdapter(), properties);
             cls.setSourceLocation(stmt.getSourceLocation());
@@ -4118,9 +4100,6 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             ResultMetadata outMetadata, IRequestParameters requestParameters, 
Map<String, IAObject> stmtParams,
             Stats stats) throws Exception {
         CopyToStatement copyTo = (CopyToStatement) stmt;
-        Namespace namespace = copyTo.getNamespace();
-        DataverseName dataverseName = namespace.getDataverseName();
-        String databaseName = namespace.getDatabaseName();
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest =
                 (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -4149,14 +4128,9 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             try {
                 ExternalDetailsDecl edd = copyTo.getExternalDetailsDecl();
-                Map<String, String> properties = 
createAndValidateAdapterConfigurationForCopyToStmt(edd,
+                
edd.setProperties(createAndValidateAdapterConfigurationForCopyToStmt(edd,
                         ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, 
copyTo.getSourceLocation(), mdTxnCtx,
-                        metadataProvider);
-
-                // request id is used to cache credentials if needed, and 
clear them after request is done
-                String uuid = 
requestParameters.getRequestReference().getUuid();
-                metadataProvider.setExternalEntityIdFromParts(properties, 
databaseName, dataverseName, uuid, true);
-                edd.setProperties(properties);
+                        metadataProvider));
 
                 if (ExternalDataConstants.FORMAT_PARQUET
                         
.equalsIgnoreCase(edd.getProperties().get(ExternalDataConstants.KEY_FORMAT))) {
@@ -5585,7 +5559,6 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             // complete async jobs after their job completes
             if (ResultDelivery.ASYNC == resultDelivery) {
                 requestTracker.complete(clientRequest.getId());
-                postRequestCompleteCleanup(requestParameters);
             }
             locker.unlock();
         }
@@ -5802,8 +5775,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
      * @param details external details
      * @param sourceLoc source location
      */
-    protected void normalizeAdapters(ExternalDetailsDecl details, 
SourceLocation sourceLoc)
-            throws CompilationException {
+    private void normalizeAdapters(ExternalDetailsDecl details, SourceLocation 
sourceLoc) throws CompilationException {
         String adapter = details.getAdapter();
         Optional<String> normalizedAdapter =
                 getSupportedAdapters().stream().filter(k -> 
k.equalsIgnoreCase(adapter)).findFirst();
@@ -5817,15 +5789,17 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         return ExternalDataConstants.EXTERNAL_READ_ADAPTERS;
     }
 
-    protected Map<String, String> 
preparePropertiesCopyForValidation(ExternalDetailsDecl externalDetails,
+    protected void validateExternalDatasetProperties(ExternalDetailsDecl 
externalDetails,
             Map<String, String> properties, SourceLocation srcLoc, 
MetadataTransactionContext mdTxnCtx,
             IApplicationContext appCtx, MetadataProvider metadataProvider)
             throws AlgebricksException, HyracksDataException {
+        // Validate adapter specific properties
         normalizeAdapters(externalDetails, srcLoc);
         String adapter = externalDetails.getAdapter();
         Map<String, String> details = new HashMap<>(properties);
         details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter);
-        return details;
+        metadataProvider.setExternalEntityId(details);
+        validateAdapterSpecificProperties(details, srcLoc, appCtx);
     }
 
     protected Map<String, String> 
createAndValidateAdapterConfigurationForCopyToStmt(
@@ -5835,6 +5809,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         String adapterName = externalDetailsDecl.getAdapter();
         Map<String, String> properties = externalDetailsDecl.getProperties();
         properties.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, 
adapterName);
+        md.setExternalEntityId(properties);
         WriterValidationUtil.validateWriterConfiguration(adapterName, 
supportedAdapters, properties, sourceLocation);
         return properties;
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
index 3a9ae1c769..689ff16c63 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.common.external;
 
-import java.util.Map;
-
 public interface IExternalCredentialsCache {
 
     /**
@@ -41,8 +39,7 @@ public interface IExternalCredentialsCache {
      * Updates the credentials cache with the provided credentials for the 
specified name
      *
      * @param key credentials key
-     * @param type credentials type
      * @param credentials credentials to cache
      */
-    void put(String key, String type, Map<String, String> credentials);
+    void put(String key, Object credentials);
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 6f3a9b6b8a..20299d5736 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INamespaceResolver;
@@ -48,7 +49,6 @@ import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.metadata.LockList;
 import org.apache.asterix.common.metadata.MetadataConstants;
@@ -999,7 +999,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE, 
dataset.getDatabaseName());
             configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
                     dataset.getDataverseName().getCanonicalForm());
-            setExternalEntityId(configuration, dataset);
+            setExternalEntityId(configuration);
+            setSourceType(configuration, adapterName);
             return 
AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(),
 adapterName,
                     configuration, itemType, null, warningCollector, 
filterEvaluatorFactory);
         } catch (Exception e) {
@@ -1007,18 +1008,12 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         }
     }
 
-    public void setExternalEntityId(Map<String, String> configuration, Dataset 
dataset) throws AlgebricksException {
-        configuration.put(ExternalDataConstants.KEY_ENTITY_ID, 
dataset.getDatasetFullyQualifiedName().toString());
+    protected void setSourceType(Map<String, String> configuration, String 
adapterName) {
+        
configuration.putIfAbsent(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, 
adapterName);
     }
 
-    public void setExternalEntityIdFromParts(Map<String, String> 
configuration, String database,
-            DataverseName dataverse, String dataset, boolean isUuid) throws 
AlgebricksException {
-        if (isUuid) {
-            configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset);
-        } else {
-            DatasetFullyQualifiedName fqn = new 
DatasetFullyQualifiedName(database, dataverse, dataset);
-            configuration.put(ExternalDataConstants.KEY_ENTITY_ID, 
fqn.toString());
-        }
+    public void setExternalEntityId(Map<String, String> configuration) throws 
AlgebricksException {
+        configuration.put(ExternalDataConstants.KEY_ENTITY_ID, 
UUID.randomUUID().toString());
     }
 
     public TxnId getTxnId() {

Reply via email to