This is an automated email from the ASF dual-hosted git repository.
htowaileb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 100f25ef7b [ASTERIXDB-3514][EXT]: Cleanup on S3 cross-account auth
100f25ef7b is described below
commit 100f25ef7b6f71d8c4ff0e3362b0948ca6e765a7
Author: Hussain Towaileb <[email protected]>
AuthorDate: Wed Feb 5 14:33:26 2025 +0300
[ASTERIXDB-3514][EXT]: Cleanup on S3 cross-account auth
Ext-ref: MB-63505
Change-Id: Iba5dfd5e077f87aa0c1ee1cc81fcf197c9f06762
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19386
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Hussain Towaileb <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../app/external/ExternalCredentialsCache.java | 47 ++++++------
.../external/ExternalCredentialsCacheUpdater.java | 87 ++++++++++++----------
.../message/UpdateAwsCredentialsCacheRequest.java | 21 ++++--
.../asterix/app/translator/QueryTranslator.java | 49 +++---------
.../common/external/IExternalCredentialsCache.java | 5 +-
.../metadata/declared/MetadataProvider.java | 19 ++---
6 files changed, 104 insertions(+), 124 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
index ed26f7c9e3..fcdbabda44 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -18,26 +18,20 @@
*/
package org.apache.asterix.app.external;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.external.IExternalCredentialsCache;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.util.Span;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-
public class ExternalCredentialsCache implements IExternalCredentialsCache {
private static final Logger LOGGER = LogManager.getLogger();
- private final ConcurrentMap<String, Pair<Span, Object>> cache = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<String, TemporaryCredentials> cache = new
ConcurrentHashMap<>();
private final int awsAssumeRoleDuration;
private final int refreshAwsAssumeRoleThresholdPercentage;
@@ -51,7 +45,7 @@ public class ExternalCredentialsCache implements
IExternalCredentialsCache {
public synchronized Object get(String key) {
invalidateCache();
if (cache.containsKey(key)) {
- return cache.get(key).getRight();
+ return cache.get(key).getCredentials();
}
return null;
}
@@ -65,20 +59,9 @@ public class ExternalCredentialsCache implements
IExternalCredentialsCache {
}
@Override
- public synchronized void put(String key, String type, Map<String, String>
credentials) {
- if
(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
- updateAwsCache(key, credentials);
- }
- }
-
- private void updateAwsCache(String name, Map<String, String> credentials) {
- String accessKeyId =
credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey =
credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken =
credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
-
- AwsSessionCredentials sessionCreds =
AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
- cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration,
TimeUnit.SECONDS), sessionCreds));
- LOGGER.info("Received and cached new credentials for {}", name);
+ public synchronized void put(String key, Object credentials) {
+ cache.put(key, new
TemporaryCredentials(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS),
credentials));
+ LOGGER.info("Received and cached new credentials for {}", key);
}
/**
@@ -86,7 +69,7 @@ public class ExternalCredentialsCache implements
IExternalCredentialsCache {
*/
private void invalidateCache() {
cache.entrySet().removeIf(entry -> {
- boolean shouldRemove = needsRefresh(entry.getValue().getLeft());
+ boolean shouldRemove =
needsRefresh(entry.getValue().getDuration());
if (shouldRemove) {
LOGGER.info("Removing cached credentials for {} because it
expired", entry.getKey());
}
@@ -106,4 +89,22 @@ public class ExternalCredentialsCache implements
IExternalCredentialsCache {
int passedPercentage = (int) (passed * 100);
return passedPercentage > refreshAwsAssumeRoleThresholdPercentage;
}
+
+ static class TemporaryCredentials {
+ private final Span duration;
+ private final Object credentials;
+
+ public TemporaryCredentials(Span duration, Object credentials) {
+ this.duration = duration;
+ this.credentials = credentials;
+ }
+
+ public Span getDuration() {
+ return duration;
+ }
+
+ public Object getCredentials() {
+ return credentials;
+ }
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
index 29e3239517..4c382d0d03 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -24,7 +24,6 @@ import static
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.
import static
org.apache.asterix.common.exceptions.ErrorCode.REJECT_BAD_CLUSTER_STATE;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -39,10 +38,10 @@ import
org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.external.IExternalCredentialsCache;
import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.asterix.messaging.CCMessageBroker;
import org.apache.asterix.messaging.NCMessageBroker;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -53,6 +52,11 @@ import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+/**
+ * The class is responsible for generating new credentials based on the
adapter type. Given a request:
+ * - if we are the CC, generate new creds and ask all NCs to update their cache
+ * - if we are the NC, send a message to the CC to generate new creds
+ */
public class ExternalCredentialsCacheUpdater implements
IExternalCredentialsCacheUpdater {
private static final Logger LOGGER = LogManager.getLogger();
@@ -66,49 +70,46 @@ public class ExternalCredentialsCacheUpdater implements
IExternalCredentialsCach
public synchronized Object generateAndCacheCredentials(Map<String, String>
configuration)
throws HyracksDataException, CompilationException {
IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
- String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
- Object credentials = cache.get(name);
+ String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
+ Object credentials = cache.get(key);
if (credentials != null) {
return credentials;
}
- /*
- * if we are the CC, generate new creds and ask all NCs to update
their cache
- * if we are the NC, send a message to the CC to generate new creds
and ask all NCs to update their cache
- */
- if (appCtx instanceof ICcApplicationContext ccAppCtx) {
- IClusterManagementWork.ClusterState state =
ccAppCtx.getClusterStateManager().getState();
- if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
- throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE,
state);
- }
+ String type =
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
+ if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(type)) {
+ credentials = generateAwsCredentials(configuration);
+ }
+
+ return credentials;
+ }
- String accessKeyId;
- String secretAccessKey;
- String sessionToken;
- Map<String, String> credentialsMap = new HashMap<>();
+ // TODO: this can probably be refactored out into something that is
AWS-specific
+ private Object generateAwsCredentials(Map<String, String> configuration)
+ throws HyracksDataException, CompilationException {
+ String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
+ AwsSessionCredentials credentials;
+ if (appCtx instanceof ICcApplicationContext) {
+ validateClusterState();
try {
- LOGGER.info("attempting to update credentials for {}", name);
+ LOGGER.info("attempting to update AWS credentials for {}",
key);
AwsCredentialsProvider newCredentials =
S3AuthUtils.assumeRoleAndGetCredentials(configuration);
- LOGGER.info("updated credentials successfully for {}", name);
- AwsSessionCredentials sessionCredentials =
(AwsSessionCredentials) newCredentials.resolveCredentials();
- accessKeyId = sessionCredentials.accessKeyId();
- secretAccessKey = sessionCredentials.secretAccessKey();
- sessionToken = sessionCredentials.sessionToken();
+ LOGGER.info("updated AWS credentials successfully for {}",
key);
+ credentials = (AwsSessionCredentials)
newCredentials.resolveCredentials();
+ appCtx.getExternalCredentialsCache().put(key, credentials);
} catch (CompilationException ex) {
- LOGGER.info("failed to refresh credentials for {}", name, ex);
+ LOGGER.info("failed to refresh AWS credentials for {}", key,
ex);
throw ex;
}
- // credentials need refreshing
- credentialsMap.put(S3Constants.ACCESS_KEY_ID_FIELD_NAME,
accessKeyId);
- credentialsMap.put(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME,
secretAccessKey);
- credentialsMap.put(S3Constants.SESSION_TOKEN_FIELD_NAME,
sessionToken);
+ String accessKeyId = credentials.accessKeyId();
+ String secretAccessKey = credentials.secretAccessKey();
+ String sessionToken = credentials.sessionToken();
+ UpdateAwsCredentialsCacheRequest request =
+ new UpdateAwsCredentialsCacheRequest(configuration,
accessKeyId, secretAccessKey, sessionToken);
// request all NCs to update their credentials cache with the
latest creds
- updateNcsCredentialsCache(ccAppCtx, name, credentialsMap,
configuration);
- String type =
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
- cache.put(name, type, credentialsMap);
- credentials = AwsSessionCredentials.create(accessKeyId,
secretAccessKey, sessionToken);
+ updateNcsCredentialsCache(key, request);
} else {
NCMessageBroker broker = (NCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
MessageFuture messageFuture = broker.registerMessageFuture();
@@ -116,7 +117,7 @@ public class ExternalCredentialsCacheUpdater implements
IExternalCredentialsCach
long futureId = messageFuture.getFutureId();
RefreshAwsCredentialsRequest request = new
RefreshAwsCredentialsRequest(nodeId, futureId, configuration);
try {
- LOGGER.info("no valid credentials found for {}, requesting
credentials from CC", name);
+ LOGGER.info("no valid AWS credentials found for {}, requesting
AWS credentials from CC", key);
broker.sendMessageToPrimaryCC(request);
RefreshAwsCredentialsResponse response =
(RefreshAwsCredentialsResponse) messageFuture
.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
@@ -126,7 +127,7 @@ public class ExternalCredentialsCacheUpdater implements
IExternalCredentialsCach
credentials =
AwsSessionCredentials.create(response.getAccessKeyId(),
response.getSecretAccessKey(),
response.getSessionToken());
} catch (Exception ex) {
- LOGGER.info("failed to refresh credentials for {}", name, ex);
+ LOGGER.info("failed to refresh AWS credentials for {}", key,
ex);
throw HyracksDataException.create(ex);
} finally {
broker.deregisterMessageFuture(futureId);
@@ -135,14 +136,12 @@ public class ExternalCredentialsCacheUpdater implements
IExternalCredentialsCach
return credentials;
}
- private void updateNcsCredentialsCache(ICcApplicationContext appCtx,
String name, Map<String, String> credentials,
- Map<String, String> configuration) throws HyracksDataException {
- final List<String> ncs = new
ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
+ private void updateNcsCredentialsCache(String key, INcAddressedMessage
request) throws HyracksDataException {
+ ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+ final List<String> ncs = new
ArrayList<>(ccAppCtx.getClusterStateManager().getParticipantNodes());
CCMessageBroker broker = (CCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
- UpdateAwsCredentialsCacheRequest request = new
UpdateAwsCredentialsCacheRequest(configuration, credentials);
-
try {
- LOGGER.info("requesting all NCs to update their credentials for
{}", name);
+ LOGGER.info("requesting all NCs to update their credentials for
{}", key);
for (String nc : ncs) {
broker.sendApplicationMessageToNC(request, nc);
}
@@ -151,4 +150,12 @@ public class ExternalCredentialsCacheUpdater implements
IExternalCredentialsCach
throw HyracksDataException.create(e);
}
}
+
+ private void validateClusterState() throws HyracksDataException {
+ ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+ IClusterManagementWork.ClusterState state =
ccAppCtx.getClusterStateManager().getState();
+ if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
+ throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state);
+ }
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
index 438e425f23..753dbf11e5 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
@@ -23,26 +23,31 @@ import java.util.Map;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
public class UpdateAwsCredentialsCacheRequest implements INcAddressedMessage {
- private static final Logger LOGGER = LogManager.getLogger();
private static final long serialVersionUID = 1L;
private final Map<String, String> configuration;
- private final Map<String, String> credentials;
+ private final String accessKeyId;
+ private final String secretAccessKey;
+ private final String sessionToken;
- public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration,
Map<String, String> credentials) {
+ public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration,
String accessKeyId,
+ String secretAccessKey, String sessionToken) {
this.configuration = configuration;
- this.credentials = credentials;
+ this.accessKeyId = accessKeyId;
+ this.secretAccessKey = secretAccessKey;
+ this.sessionToken = sessionToken;
+
}
@Override
public void handle(INcApplicationContext appCtx) {
String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
- String type =
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
- appCtx.getExternalCredentialsCache().put(name, type, credentials);
+ AwsSessionCredentials credentials =
AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
+ appCtx.getExternalCredentialsCache().put(name, credentials);
}
@Override
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 97bc3cdba0..8ba9a0279e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -579,17 +579,11 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
// async queries are completed after their job completes
if (ResultDelivery.ASYNC != resultDelivery) {
appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid());
- postRequestCompleteCleanup(requestParameters);
}
Thread.currentThread().setName(threadName);
}
}
- protected void postRequestCompleteCleanup(IRequestParameters
requestParameters) {
- String uuid = requestParameters.getRequestReference().getUuid();
- appCtx.getExternalCredentialsCache().delete(uuid);
- }
-
protected void configureMetadataProvider(MetadataProvider
metadataProvider, Map<String, String> config,
Counter resultSetIdCounter, FileSplit outputFile,
IRequestParameters requestParameters,
Statement statement) {
@@ -1038,12 +1032,8 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
ExternalDataUtils.validateType(properties, (ARecordType)
itemType);
- Map<String, String> propertiesCopy =
preparePropertiesCopyForValidation(externalDetails, properties,
- dd.getSourceLocation(), mdTxnCtx, appCtx,
metadataProvider);
- // do any necessary validation on the copy to avoid
changing the original and storing it in the metadata
-
metadataProvider.setExternalEntityIdFromParts(propertiesCopy, databaseName,
dataverseName,
- datasetName, false);
- validateAdapterSpecificProperties(propertiesCopy,
dd.getSourceLocation(), appCtx);
+ validateExternalDatasetProperties(externalDetails,
properties, dd.getSourceLocation(), mdTxnCtx,
+ appCtx, metadataProvider);
datasetDetails = new
ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
TransactionState.COMMIT);
break;
@@ -2459,7 +2449,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
sourceLoc, EnumSet.of(DropOption.IF_EXISTS),
requestParameters.isForceDropDataset());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
- deleteDatasetCachedCredentials(ds);
return true;
} catch (Exception e) {
LOGGER.error("failed to drop dataset; executing compensating
operations", e);
@@ -2507,10 +2496,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
}
}
- protected void deleteDatasetCachedCredentials(Dataset dataset) throws
CompilationException {
-
appCtx.getExternalCredentialsCache().delete(dataset.getDatasetFullyQualifiedName().toString());
- }
-
protected void handleIndexDropStatement(MetadataProvider metadataProvider,
Statement stmt,
IHyracksClientConnection hcc, IRequestParameters
requestParameters) throws Exception {
IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
@@ -4057,11 +4042,8 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
copyStmt, itemType, mdTxnCtx, metadataProvider);
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
- Map<String, String> propertiesCopy =
preparePropertiesCopyForValidation(externalDetails, properties,
- copyStmt.getSourceLocation(), mdTxnCtx, appCtx,
metadataProvider);
- // do any necessary validation on the copy to avoid changing the
original and storing it in the metadata
- metadataProvider.setExternalEntityId(propertiesCopy, dataset);
- validateAdapterSpecificProperties(propertiesCopy,
copyStmt.getSourceLocation(), appCtx);
+ validateExternalDatasetProperties(externalDetails, properties,
copyStmt.getSourceLocation(), mdTxnCtx,
+ appCtx, metadataProvider);
CompiledCopyFromFileStatement cls = new
CompiledCopyFromFileStatement(databaseName, dataverseName,
copyStmt.getDatasetName(), itemType,
externalDetails.getAdapter(), properties);
cls.setSourceLocation(stmt.getSourceLocation());
@@ -4118,9 +4100,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
ResultMetadata outMetadata, IRequestParameters requestParameters,
Map<String, IAObject> stmtParams,
Stats stats) throws Exception {
CopyToStatement copyTo = (CopyToStatement) stmt;
- Namespace namespace = copyTo.getNamespace();
- DataverseName dataverseName = namespace.getDataverseName();
- String databaseName = namespace.getDatabaseName();
final IRequestTracker requestTracker = appCtx.getRequestTracker();
final ClientRequest clientRequest =
(ClientRequest)
requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -4149,14 +4128,9 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
ExternalDetailsDecl edd = copyTo.getExternalDetailsDecl();
- Map<String, String> properties =
createAndValidateAdapterConfigurationForCopyToStmt(edd,
+
edd.setProperties(createAndValidateAdapterConfigurationForCopyToStmt(edd,
ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS,
copyTo.getSourceLocation(), mdTxnCtx,
- metadataProvider);
-
- // request id is used to cache credentials if needed, and
clear them after request is done
- String uuid =
requestParameters.getRequestReference().getUuid();
- metadataProvider.setExternalEntityIdFromParts(properties,
databaseName, dataverseName, uuid, true);
- edd.setProperties(properties);
+ metadataProvider));
if (ExternalDataConstants.FORMAT_PARQUET
.equalsIgnoreCase(edd.getProperties().get(ExternalDataConstants.KEY_FORMAT))) {
@@ -5585,7 +5559,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
// complete async jobs after their job completes
if (ResultDelivery.ASYNC == resultDelivery) {
requestTracker.complete(clientRequest.getId());
- postRequestCompleteCleanup(requestParameters);
}
locker.unlock();
}
@@ -5802,8 +5775,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
* @param details external details
* @param sourceLoc source location
*/
- protected void normalizeAdapters(ExternalDetailsDecl details,
SourceLocation sourceLoc)
- throws CompilationException {
+ private void normalizeAdapters(ExternalDetailsDecl details, SourceLocation
sourceLoc) throws CompilationException {
String adapter = details.getAdapter();
Optional<String> normalizedAdapter =
getSupportedAdapters().stream().filter(k ->
k.equalsIgnoreCase(adapter)).findFirst();
@@ -5817,15 +5789,17 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
return ExternalDataConstants.EXTERNAL_READ_ADAPTERS;
}
- protected Map<String, String>
preparePropertiesCopyForValidation(ExternalDetailsDecl externalDetails,
+ protected void validateExternalDatasetProperties(ExternalDetailsDecl
externalDetails,
Map<String, String> properties, SourceLocation srcLoc,
MetadataTransactionContext mdTxnCtx,
IApplicationContext appCtx, MetadataProvider metadataProvider)
throws AlgebricksException, HyracksDataException {
+ // Validate adapter specific properties
normalizeAdapters(externalDetails, srcLoc);
String adapter = externalDetails.getAdapter();
Map<String, String> details = new HashMap<>(properties);
details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter);
- return details;
+ metadataProvider.setExternalEntityId(details);
+ validateAdapterSpecificProperties(details, srcLoc, appCtx);
}
protected Map<String, String>
createAndValidateAdapterConfigurationForCopyToStmt(
@@ -5835,6 +5809,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
String adapterName = externalDetailsDecl.getAdapter();
Map<String, String> properties = externalDetailsDecl.getProperties();
properties.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE,
adapterName);
+ md.setExternalEntityId(properties);
WriterValidationUtil.validateWriterConfiguration(adapterName,
supportedAdapters, properties, sourceLocation);
return properties;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
index 3a9ae1c769..689ff16c63 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.common.external;
-import java.util.Map;
-
public interface IExternalCredentialsCache {
/**
@@ -41,8 +39,7 @@ public interface IExternalCredentialsCache {
* Updates the credentials cache with the provided credentials for the
specified name
*
* @param key credentials key
- * @param type credentials type
* @param credentials credentials to cache
*/
- void put(String key, String type, Map<String, String> credentials);
+ void put(String key, Object credentials);
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 6f3a9b6b8a..20299d5736 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.asterix.common.api.INamespaceResolver;
@@ -48,7 +49,6 @@ import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.metadata.LockList;
import org.apache.asterix.common.metadata.MetadataConstants;
@@ -999,7 +999,8 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE,
dataset.getDatabaseName());
configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
dataset.getDataverseName().getCanonicalForm());
- setExternalEntityId(configuration, dataset);
+ setExternalEntityId(configuration);
+ setSourceType(configuration, adapterName);
return
AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(),
adapterName,
configuration, itemType, null, warningCollector,
filterEvaluatorFactory);
} catch (Exception e) {
@@ -1007,18 +1008,12 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
}
}
- public void setExternalEntityId(Map<String, String> configuration, Dataset
dataset) throws AlgebricksException {
- configuration.put(ExternalDataConstants.KEY_ENTITY_ID,
dataset.getDatasetFullyQualifiedName().toString());
+ protected void setSourceType(Map<String, String> configuration, String
adapterName) {
+
configuration.putIfAbsent(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE,
adapterName);
}
- public void setExternalEntityIdFromParts(Map<String, String>
configuration, String database,
- DataverseName dataverse, String dataset, boolean isUuid) throws
AlgebricksException {
- if (isUuid) {
- configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset);
- } else {
- DatasetFullyQualifiedName fqn = new
DatasetFullyQualifiedName(database, dataverse, dataset);
- configuration.put(ExternalDataConstants.KEY_ENTITY_ID,
fqn.toString());
- }
+ public void setExternalEntityId(Map<String, String> configuration) throws
AlgebricksException {
+ configuration.put(ExternalDataConstants.KEY_ENTITY_ID,
UUID.randomUUID().toString());
}
public TxnId getTxnId() {