This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new d21a38bf55 [ASTERIXDB-3514][EXT]: Assume role only when temporary
credentials expire
d21a38bf55 is described below
commit d21a38bf553a2575741ffef438b4959cee6bada4
Author: Hussain Towaileb <[email protected]>
AuthorDate: Wed Nov 13 21:56:54 2024 +0300
[ASTERIXDB-3514][EXT]: Assume role only when temporary credentials expire
- user model changes: no
- storage format changes: no
- interface changes: yes
When using temporary credentials from assuming
a role, cache the credentials and only refresh
them when they expired.
Ext-ref: MB-63505
Change-Id: I622853b794f81cd4bda84964a6cf7041d889d20f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19067
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Peeyush Gupta <[email protected]>
Reviewed-by: Hussain Towaileb <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Hussain Towaileb <[email protected]>
---
.../asterix/app/cc/CcApplicationContext.java | 18 +
.../app/external/ExternalCredentialsCache.java | 97 ++++
.../external/ExternalCredentialsCacheUpdater.java | 153 ++++++
.../app/message/RefreshAwsCredentialsRequest.java | 81 +++
.../app/message/RefreshAwsCredentialsResponse.java | 73 +++
.../message/UpdateAwsCredentialsCacheRequest.java | 46 ++
.../apache/asterix/app/nc/NCAppRuntimeContext.java | 18 +
.../AbstractCloudExternalFileWriterFactory.java | 11 +-
.../cloud/writer/GCSExternalFileWriterFactory.java | 5 +-
.../cloud/writer/S3ExternalFileWriterFactory.java | 9 +-
.../asterix/common/api/IApplicationContext.java | 12 +
.../asterix/common/exceptions/ErrorCode.java | 2 +
.../common/external/IExternalCredentialsCache.java | 48 ++
.../IExternalCredentialsCacheUpdater.java} | 17 +-
.../src/main/resources/asx_errormsg/en.properties | 2 +
.../input/record/reader/aws/AwsS3InputStream.java | 11 +-
.../record/reader/aws/AwsS3InputStreamFactory.java | 10 +-
.../aws/parquet/AwsS3ParquetReaderFactory.java | 6 +-
.../external/util/ExternalDataConstants.java | 1 +
.../asterix/external/util/ExternalDataUtils.java | 4 +-
.../util/aws/s3/{S3Utils.java => S3AuthUtils.java} | 558 +++++++--------------
.../asterix/external/util/aws/s3/S3Utils.java | 391 +--------------
.../writer/HDFSExternalFileWriterFactory.java | 3 +-
.../writer/LocalFSExternalFileWriterFactory.java | 3 +-
.../metadata/declared/MetadataProvider.java | 1 +
.../metadata/provider/ExternalWriterProvider.java | 2 +-
.../writer/IExternalWriterFactoryValidator.java | 3 +-
27 files changed, 793 insertions(+), 792 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index 6ea7aebf83..e892d0445e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
+import org.apache.asterix.app.external.ExternalCredentialsCache;
+import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IConfigValidator;
import org.apache.asterix.common.api.IConfigValidatorFactory;
@@ -55,6 +57,8 @@ import
org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
import org.apache.asterix.common.external.IAdapterFactoryService;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
@@ -127,6 +131,8 @@ public class CcApplicationContext implements
ICcApplicationContext {
private final IOManager ioManager;
private final INamespacePathResolver namespacePathResolver;
private final INamespaceResolver namespaceResolver;
+ private final IExternalCredentialsCache externalCredentialsCache;
+ private final IExternalCredentialsCacheUpdater
externalCredentialsCacheUpdater;
public CcApplicationContext(ICCServiceContext ccServiceCtx,
HyracksConnection hcc,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
IGlobalRecoveryManager globalRecoveryManager,
@@ -177,6 +183,8 @@ public class CcApplicationContext implements
ICcApplicationContext {
this.globalTxManager = globalTxManager;
this.ioManager = ioManager;
dataPartitioningProvider = DataPartitioningProvider.create(this);
+ externalCredentialsCache = new ExternalCredentialsCache();
+ externalCredentialsCacheUpdater = new
ExternalCredentialsCacheUpdater(this);
}
@Override
@@ -415,4 +423,14 @@ public class CcApplicationContext implements
ICcApplicationContext {
public IOManager getIoManager() {
return ioManager;
}
+
+ @Override
+ public IExternalCredentialsCache getExternalCredentialsCache() {
+ return externalCredentialsCache;
+ }
+
+ @Override
+ public IExternalCredentialsCacheUpdater
getExternalCredentialsCacheUpdater() {
+ return externalCredentialsCacheUpdater;
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
new file mode 100644
index 0000000000..0ddca4e700
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.metadata.MetadataConstants;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.util.Span;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class ExternalCredentialsCache implements IExternalCredentialsCache {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ConcurrentMap<String, Pair<Span, Object>> cache = new
ConcurrentHashMap<>();
+
+ public ExternalCredentialsCache() {
+ }
+
+ @Override
+ public synchronized Object getCredentials(Map<String, String>
configuration) {
+ String name = getName(configuration);
+ if (cache.containsKey(name) &&
!needsRefresh(cache.get(name).getLeft())) {
+ return cache.get(name).getRight();
+ }
+ return null;
+ }
+
+ @Override
+ public synchronized void updateCache(Map<String, String> configuration,
Map<String, String> credentials) {
+ String type = configuration.get(ExternalDataConstants.KEY_READER);
+ if
(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
+ updateAwsCache(configuration, credentials);
+ }
+ }
+
+ @Override
+ public String getName(Map<String, String> configuration) {
+ String database =
configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE);
+ if (database == null) {
+ database = MetadataConstants.DEFAULT_DATABASE;
+ }
+ String dataverse =
configuration.get(ExternalDataConstants.KEY_DATASET_DATAVERSE);
+ String dataset = configuration.get(ExternalDataConstants.KEY_DATASET);
+ return String.join(".", database, dataverse, dataset);
+ }
+
+ private void updateAwsCache(Map<String, String> configuration, Map<String,
String> credentials) {
+ String accessKeyId =
credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey =
credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
+ String sessionToken =
credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
+ doUpdateAwsCache(configuration,
AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
+ }
+
+ private void doUpdateAwsCache(Map<String, String> configuration,
AwsSessionCredentials credentials) {
+ // TODO(htowaileb): Set default expiration value
+ String name = getName(configuration);
+ cache.put(name, Pair.of(Span.start(15, TimeUnit.MINUTES),
credentials));
+ LOGGER.info("Received and cached new credentials for {}", name);
+ }
+
+ /**
+ * Refresh if the remaining time is half or less than the total expiration
time
+ *
+ * @param span expiration span
+ * @return true if the remaining time is half or less than the total
expiration time, false otherwise
+ */
+ private boolean needsRefresh(Span span) {
+ // TODO(htowaileb): At what % (and should be configurable?) do we
decide it's better to refresh credentials
+ return (double) span.remaining(TimeUnit.MINUTES) /
span.getSpan(TimeUnit.MINUTES) < 0.5;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
new file mode 100644
index 0000000000..f07caaae97
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import static
org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+import static
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE;
+import static
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED;
+import static
org.apache.asterix.common.exceptions.ErrorCode.REJECT_BAD_CLUSTER_STATE;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.message.RefreshAwsCredentialsRequest;
+import org.apache.asterix.app.message.RefreshAwsCredentialsResponse;
+import org.apache.asterix.app.message.UpdateAwsCredentialsCacheRequest;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class ExternalCredentialsCacheUpdater implements
IExternalCredentialsCacheUpdater {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final IApplicationContext appCtx;
+
+ public ExternalCredentialsCacheUpdater(IApplicationContext appCtx) {
+ this.appCtx = appCtx;
+ }
+
+ @Override
+ public synchronized Object generateAndCacheCredentials(Map<String, String>
configuration)
+ throws HyracksDataException, CompilationException {
+ IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
+ Object credentials = cache.getCredentials(configuration);
+ if (credentials != null) {
+ return credentials;
+ }
+
+ /*
+ * if we are the CC, generate new creds and ask all NCs to update
their cache
+ * if we are the NC, send a message to the CC to generate new creds
and ask all NCs to update their cache
+ */
+ String name = cache.getName(configuration);
+ if (appCtx instanceof ICcApplicationContext) {
+ ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+ IClusterManagementWork.ClusterState state =
ccAppCtx.getClusterStateManager().getState();
+ if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
+ throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE,
state);
+ }
+
+ String accessKeyId;
+ String secretAccessKey;
+ String sessionToken;
+ Map<String, String> credentialsMap = new HashMap<>();
+ try {
+ LOGGER.info("attempting to update credentials for {}", name);
+ AwsCredentialsProvider newCredentials =
S3AuthUtils.assumeRoleAndGetCredentials(configuration);
+ LOGGER.info("updated credentials successfully for {}", name);
+ AwsSessionCredentials sessionCredentials =
(AwsSessionCredentials) newCredentials.resolveCredentials();
+ accessKeyId = sessionCredentials.accessKeyId();
+ secretAccessKey = sessionCredentials.secretAccessKey();
+ sessionToken = sessionCredentials.sessionToken();
+ } catch (CompilationException ex) {
+ LOGGER.info("failed to refresh credentials for {}", name, ex);
+ throw ex;
+ }
+
+ // credentials need refreshing
+ credentialsMap.put(S3Constants.ACCESS_KEY_ID_FIELD_NAME,
accessKeyId);
+ credentialsMap.put(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME,
secretAccessKey);
+ credentialsMap.put(S3Constants.SESSION_TOKEN_FIELD_NAME,
sessionToken);
+
+ // request all NCs to update their credentials cache with the
latest creds
+ updateNcsCredentialsCache(ccAppCtx, name, credentialsMap,
configuration);
+ cache.updateCache(configuration, credentialsMap);
+ credentials = AwsSessionCredentials.create(accessKeyId,
secretAccessKey, sessionToken);
+ } else {
+ NCMessageBroker broker = (NCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
+ MessageFuture messageFuture = broker.registerMessageFuture();
+ String nodeId = ((INCServiceContext)
appCtx.getServiceContext()).getNodeId();
+ long futureId = messageFuture.getFutureId();
+ RefreshAwsCredentialsRequest request = new
RefreshAwsCredentialsRequest(nodeId, futureId, configuration);
+ try {
+ LOGGER.info("no valid credentials found for {}, requesting
credentials from CC", name);
+ broker.sendMessageToPrimaryCC(request);
+ RefreshAwsCredentialsResponse response =
(RefreshAwsCredentialsResponse) messageFuture
+ .get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ if (response.getFailure() != null) {
+ throw HyracksDataException.create(response.getFailure());
+ }
+ credentials =
AwsSessionCredentials.create(response.getAccessKeyId(),
response.getSecretAccessKey(),
+ response.getSessionToken());
+ } catch (Exception ex) {
+ LOGGER.info("failed to refresh credentials for {}", name, ex);
+ throw HyracksDataException.create(ex);
+ } finally {
+ broker.deregisterMessageFuture(futureId);
+ }
+ }
+ return credentials;
+ }
+
+ private void updateNcsCredentialsCache(ICcApplicationContext appCtx,
String name, Map<String, String> credentials,
+ Map<String, String> configuration) throws HyracksDataException {
+ final List<String> ncs = new
ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
+ CCMessageBroker broker = (CCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
+ UpdateAwsCredentialsCacheRequest request = new
UpdateAwsCredentialsCacheRequest(configuration, credentials);
+
+ try {
+ LOGGER.info("requesting all NCs to update their credentials for
{}", name);
+ for (String nc : ncs) {
+ broker.sendApplicationMessageToNC(request, nc);
+ }
+ } catch (Exception e) {
+ LOGGER.info("failed to send message to nc", e);
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java
new file mode 100644
index 0000000000..32de92d45b
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class RefreshAwsCredentialsRequest implements ICcAddressedMessage {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private final String nodeId;
+ private final long reqId;
+ private final Map<String, String> configuration;
+
+ public RefreshAwsCredentialsRequest(String nodeId, long reqId, Map<String,
String> configuration) {
+ this.nodeId = nodeId;
+ this.reqId = reqId;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public final void handle(ICcApplicationContext appCtx) throws
HyracksDataException {
+ try {
+ IExternalCredentialsCacheUpdater cacheUpdater =
appCtx.getExternalCredentialsCacheUpdater();
+ Object credentials =
cacheUpdater.generateAndCacheCredentials(configuration);
+ AwsSessionCredentials sessionCredentials = (AwsSessionCredentials)
credentials;
+
+ // respond with the credentials
+ RefreshAwsCredentialsResponse response =
+ new RefreshAwsCredentialsResponse(reqId,
sessionCredentials.accessKeyId(),
+ sessionCredentials.secretAccessKey(),
sessionCredentials.sessionToken(), null);
+ respond(appCtx, response);
+ } catch (Exception e) {
+ LOGGER.info("failed to refresh credentials", e);
+ RefreshAwsCredentialsResponse response = new
RefreshAwsCredentialsResponse(reqId, null, null, null, e);
+ respond(appCtx, response);
+ }
+ }
+
+ private void respond(ICcApplicationContext appCtx,
RefreshAwsCredentialsResponse response)
+ throws HyracksDataException {
+ CCMessageBroker broker = (CCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
+ try {
+ broker.sendApplicationMessageToNC(response, nodeId);
+ } catch (Exception e) {
+ LOGGER.info("failed to send reply to nc", e);
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public boolean isWhispered() {
+ return true;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java
new file mode 100644
index 0000000000..9ea0e1165b
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+
+public class RefreshAwsCredentialsResponse implements INcAddressedMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final long reqId;
+ private final String accessKeyId;
+ private final String secretAccessKey;
+ private final String sessionToken;
+ private final Throwable failure;
+
+ public RefreshAwsCredentialsResponse(long reqId, String accessKeyId,
String secretAccessKey, String sessionToken,
+ Throwable failure) {
+ this.reqId = reqId;
+ this.accessKeyId = accessKeyId;
+ this.secretAccessKey = secretAccessKey;
+ this.sessionToken = sessionToken;
+ this.failure = failure;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) {
+ NCMessageBroker mb = (NCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
+ MessageFuture future = mb.deregisterMessageFuture(reqId);
+ if (future != null) {
+ future.complete(this);
+ }
+ }
+
+ public String getAccessKeyId() {
+ return accessKeyId;
+ }
+
+ public String getSecretAccessKey() {
+ return secretAccessKey;
+ }
+
+ public String getSessionToken() {
+ return sessionToken;
+ }
+
+ public Throwable getFailure() {
+ return failure;
+ }
+
+ @Override
+ public boolean isWhispered() {
+ return true;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
new file mode 100644
index 0000000000..44d4c21305
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+
+public class UpdateAwsCredentialsCacheRequest implements INcAddressedMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final Map<String, String> configuration;
+ private final Map<String, String> credentials;
+
+ public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration,
Map<String, String> credentials) {
+ this.configuration = configuration;
+ this.credentials = credentials;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) {
+ appCtx.getExternalCredentialsCache().updateCache(configuration,
credentials);
+ }
+
+ @Override
+ public boolean isWhispered() {
+ return true;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 7da3838838..8c2a0ab95d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -32,6 +32,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.app.external.ExternalCredentialsCache;
+import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.cloud.CloudConfigurator;
import org.apache.asterix.cloud.LocalPartitionBootstrapper;
@@ -63,6 +65,8 @@ import
org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.context.DiskWriteRateLimiterProvider;
import org.apache.asterix.common.context.GlobalVirtualBufferCache;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
@@ -186,6 +190,8 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
private final INamespacePathResolver namespacePathResolver;
private final INamespaceResolver namespaceResolver;
private IDiskCacheMonitoringService diskCacheService;
+ protected IExternalCredentialsCache externalCredentialsCache;
+ protected IExternalCredentialsCacheUpdater externalCredentialsCacheUpdater;
public NCAppRuntimeContext(INCServiceContext ncServiceContext,
NCExtensionManager extensionManager,
IPropertiesFactory propertiesFactory, INamespaceResolver
namespaceResolver,
@@ -210,6 +216,8 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
cacheManager = new CacheManager();
this.namespacePathResolver = namespacePathResolver;
this.namespaceResolver = namespaceResolver;
+ this.externalCredentialsCache = new ExternalCredentialsCache();
+ this.externalCredentialsCacheUpdater = new
ExternalCredentialsCacheUpdater(this);
}
@Override
@@ -748,4 +756,14 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
return isCloudDeployment() ?
storageProperties.getStoragePartitionsCount()
: ncServiceContext.getIoManager().getIODevices().size();
}
+
+ @Override
+ public IExternalCredentialsCache getExternalCredentialsCache() {
+ return externalCredentialsCache;
+ }
+
+ @Override
+ public IExternalCredentialsCacheUpdater
getExternalCredentialsCacheUpdater() {
+ return externalCredentialsCacheUpdater;
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index 589ee7904d..198b3ad81b 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -30,6 +30,7 @@ import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.cloud.WriterSingleBufferProvider;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -60,17 +61,17 @@ abstract class AbstractCloudExternalFileWriterFactory
implements IExternalFileWr
writeBufferSize = externalConfig.getWriteBufferSize();
}
- abstract ICloudClient createCloudClient() throws CompilationException;
+ abstract ICloudClient createCloudClient(IApplicationContext appCtx) throws
CompilationException;
abstract boolean isNoContainerFoundException(IOException e);
abstract boolean isSdkException(Throwable e);
- final void buildClient() throws HyracksDataException {
+ final void buildClient(IApplicationContext appCtx) throws
HyracksDataException {
try {
synchronized (this) {
if (cloudClient == null) {
- cloudClient = createCloudClient();
+ cloudClient = createCloudClient(appCtx);
}
}
} catch (CompilationException e) {
@@ -79,8 +80,8 @@ abstract class AbstractCloudExternalFileWriterFactory
implements IExternalFileWr
}
@Override
- public final void validate() throws AlgebricksException {
- ICloudClient testClient = createCloudClient();
+ public final void validate(IApplicationContext appCtx) throws
AlgebricksException {
+ ICloudClient testClient = createCloudClient(appCtx);
String bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
if (bucket == null || bucket.isEmpty()) {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 886f20d12d..63a8366c3c 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -24,6 +24,7 @@ import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.google.gcs.GCSUtils;
@@ -61,7 +62,7 @@ public final class GCSExternalFileWriterFactory extends
AbstractCloudExternalFil
}
@Override
- ICloudClient createCloudClient() throws CompilationException {
+ ICloudClient createCloudClient(IApplicationContext appCtx) throws
CompilationException {
GCSClientConfig config = GCSClientConfig.of(configuration,
writeBufferSize);
return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
@@ -80,7 +81,7 @@ public final class GCSExternalFileWriterFactory extends
AbstractCloudExternalFil
@Override
public IExternalFileWriter createWriter(IHyracksTaskContext context,
IExternalPrinterFactory printerFactory)
throws HyracksDataException {
- buildClient();
+ buildClient(((IApplicationContext)
context.getJobletContext().getServiceContext().getApplicationContext()));
String bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
IExternalPrinter printer = printerFactory.createPrinter();
IWarningCollector warningCollector = context.getWarningCollector();
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index e07acc0246..d268efd212 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -24,9 +24,10 @@ import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
@@ -61,9 +62,9 @@ public final class S3ExternalFileWriterFactory extends
AbstractCloudExternalFile
}
@Override
- ICloudClient createCloudClient() throws CompilationException {
+ ICloudClient createCloudClient(IApplicationContext appCtx) throws
CompilationException {
S3ClientConfig config = S3ClientConfig.of(configuration,
writeBufferSize);
- return new S3CloudClient(config,
S3Utils.buildAwsS3Client(configuration),
+ return new S3CloudClient(config, S3AuthUtils.buildAwsS3Client(appCtx,
configuration),
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
@@ -80,7 +81,7 @@ public final class S3ExternalFileWriterFactory extends
AbstractCloudExternalFile
@Override
public IExternalFileWriter createWriter(IHyracksTaskContext context,
IExternalPrinterFactory printerFactory)
throws HyracksDataException {
- buildClient();
+ buildClient(((IApplicationContext)
context.getJobletContext().getServiceContext().getApplicationContext()));
String bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
IExternalPrinter printer = printerFactory.createPrinter();
IWarningCollector warningCollector = context.getWarningCollector();
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index eabebf7b3a..19e4ad7ab7 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -29,6 +29,8 @@ import org.apache.asterix.common.config.NodeProperties;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -106,4 +108,14 @@ public interface IApplicationContext {
INamespaceResolver getNamespaceResolver();
INamespacePathResolver getNamespacePathResolver();
+
+ /**
+ * @return external credentials cache
+ */
+ IExternalCredentialsCache getExternalCredentialsCache();
+
+ /**
+ * @return external credentials cache updater
+ */
+ IExternalCredentialsCacheUpdater getExternalCredentialsCacheUpdater();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 0cf6eb2736..35e0699224 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -314,6 +314,8 @@ public enum ErrorCode implements IError {
MAXIMUM_VALUE_ALLOWED_FOR_PARAM(1209),
STORAGE_SIZE_NOT_APPLICABLE_TO_TYPE(1210),
COULD_NOT_CREATE_TOKENS(1211),
+ NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION(1212),
+ FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION(1213),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
new file mode 100644
index 0000000000..245b350653
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import java.util.Map;
+
+public interface IExternalCredentialsCache {
+
+ /**
+ * Returns the cached credentials. Can be of any supported external
credentials type
+ *
+ * @param configuration configuration containing external collection
details
+ * @return credentials if present, null otherwise
+ */
+ Object getCredentials(Map<String, String> configuration);
+
+ /**
+ * Updates the credentials cache with the provided credentials for the
specified name
+ *
+ * @param configuration configuration containing external collection
details
+ * @param credentials credentials to cache
+ */
+ void updateCache(Map<String, String> configuration, Map<String, String>
credentials);
+
+ /**
+ * Returns the name of the entity which the cached credentials belong to
+ *
+ * @param configuration configuration containing external collection
details
+ * @return name of entity which credentials belong to
+ */
+ String getName(Map<String, String> configuration);
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java
similarity index 61%
copy from
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
copy to
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java
index 4a75db6906..48553c0606 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java
@@ -16,13 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.writer;
+package org.apache.asterix.common.external;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IExternalCredentialsCacheUpdater {
-public interface IExternalWriterFactoryValidator {
/**
- * Perform the necessary validation to ensure the writer has the proper
permissions
+ * Generates new credentials and caches them
+ *
+ * @param configuration configuration containing external collection
details
*/
- void validate() throws AlgebricksException;
+ Object generateAndCacheCredentials(Map<String, String> configuration)
+ throws HyracksDataException, CompilationException;
}
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index adb26fe585..d15a751b50 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -316,6 +316,8 @@
1209 = Maximum value allowed for '%1$s' is %2$s. Found %3$s
1210 = Retrieving storage size is not applicable to type: %1$s.
1211 = Could not create delegation tokens
+1212 = No credentials found for cross-account authentication. Expected
instance profile or access key id & secret access key for assuming role
+1213 = Failed to perform cross-account authentication. Encountered error :
'%1$s'
# Feed Errors
3001 = Illegal state.
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 7a1bdad9cb..138b364295 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -26,13 +26,14 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
@@ -48,14 +49,16 @@ import software.amazon.awssdk.services.s3.model.S3Exception;
public class AwsS3InputStream extends AbstractExternalInputStream {
// Configuration
+ private final IApplicationContext ncAppCtx;
private final String bucket;
private final S3Client s3Client;
private ResponseInputStream<?> s3InStream;
private static final int MAX_RETRIES = 5; // We will retry 5 times in case
of internal error from AWS S3 service
- public AwsS3InputStream(Map<String, String> configuration, List<String>
filePaths,
+ public AwsS3InputStream(IApplicationContext ncAppCtx, Map<String, String>
configuration, List<String> filePaths,
IExternalFilterValueEmbedder valueEmbedder) throws
HyracksDataException {
super(configuration, filePaths, valueEmbedder);
+ this.ncAppCtx = ncAppCtx;
this.s3Client = buildAwsS3Client(configuration);
this.bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
@@ -113,7 +116,7 @@ public class AwsS3InputStream extends
AbstractExternalInputStream {
}
private boolean shouldRetry(String errorCode, int currentRetry) {
- return currentRetry < MAX_RETRIES &&
S3Utils.isRetryableError(errorCode);
+ return currentRetry < MAX_RETRIES &&
S3AuthUtils.isRetryableError(errorCode);
}
@Override
@@ -141,7 +144,7 @@ public class AwsS3InputStream extends
AbstractExternalInputStream {
private S3Client buildAwsS3Client(Map<String, String> configuration)
throws HyracksDataException {
try {
- return S3Utils.buildAwsS3Client(configuration);
+ return S3AuthUtils.buildAwsS3Client(ncAppCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 36d21d1aaf..e9c72e31d2 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
@@ -47,8 +48,10 @@ public class AwsS3InputStreamFactory extends
AbstractExternalInputStreamFactory
public AsterixInputStream createInputStream(IExternalDataRuntimeContext
context) throws HyracksDataException {
IExternalFilterValueEmbedder valueEmbedder =
context.getValueEmbedder();
int partition = context.getPartition();
- return new AwsS3InputStream(configuration,
partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
- valueEmbedder);
+ IApplicationContext ncAppCtx = (IApplicationContext)
context.getTaskContext().getJobletContext()
+ .getServiceContext().getApplicationContext();
+ return new AwsS3InputStream(ncAppCtx, configuration,
+ partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
valueEmbedder);
}
@Override
@@ -65,7 +68,8 @@ public class AwsS3InputStreamFactory extends
AbstractExternalInputStreamFactory
configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME,
externalDataPrefix.getRoot());
// get the items
- List<S3Object> filesOnly = S3Utils.listS3Objects(configuration,
includeExcludeMatcher, warningCollector,
+ IApplicationContext appCtx = (IApplicationContext)
ctx.getApplicationContext();
+ List<S3Object> filesOnly = S3Utils.listS3Objects(appCtx,
configuration, includeExcludeMatcher, warningCollector,
externalDataPrefix, evaluator);
// Distribute work load amongst the partitions
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index abed33afcb..7ddbab914e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.external.input.record.reader.aws.parquet;
-import static
org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf;
+import static
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
import static org.apache.asterix.external.util.aws.s3.S3Utils.listS3Objects;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -73,7 +74,8 @@ public class AwsS3ParquetReaderFactory extends
HDFSDataSourceFactory {
configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME,
externalDataPrefix.getRoot());
String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- List<S3Object> filesOnly = listS3Objects(configuration,
includeExcludeMatcher, warningCollector,
+ IApplicationContext appCtx = (IApplicationContext)
serviceCtx.getApplicationContext();
+ List<S3Object> filesOnly = listS3Objects(appCtx, configuration,
includeExcludeMatcher, warningCollector,
externalDataPrefix, evaluator);
path = buildPathURIs(container, filesOnly);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index e758f64f88..1de2cd252c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -39,6 +39,7 @@ public class ExternalDataConstants {
// used to specify the stream factory for an adapter that has a stream
data source
public static final String KEY_STREAM = "stream";
//TODO(DB): check adapter configuration
+ public static final String KEY_DATASET = "dataset";
public static final String KEY_DATASET_DATABASE = "dataset-database";
// used to specify the dataverse of the adapter
public static final String KEY_DATASET_DATAVERSE = "dataset-dataverse";
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 8c72a8c4c3..1d5f2ed0d9 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -74,8 +74,8 @@ import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna
import org.apache.asterix.external.library.JavaLibrary;
import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.external.util.azure.blob_storage.AzureConstants;
import org.apache.asterix.external.util.google.gcs.GCSConstants;
import org.apache.asterix.om.types.ARecordType;
@@ -675,7 +675,7 @@ public class ExternalDataUtils {
switch (type) {
case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
- S3Utils.validateProperties(configuration, srcLoc, collector);
+ S3AuthUtils.validateProperties(appCtx, configuration, srcLoc,
collector);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
validateAzureBlobProperties(configuration, srcLoc, collector,
appCtx);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
similarity index 57%
copy from
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
copy to
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 3cfccb47e2..2ba1844a86 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -51,22 +51,18 @@ import static
org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
-import java.util.function.BiPredicate;
-import java.util.regex.Matcher;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.external.IExternalFilterEvaluator;
-import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
-import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.mapred.JobConf;
@@ -87,22 +83,17 @@ import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.s3.model.CommonPrefix;
-import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Exception;
-import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.S3Response;
-import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
import software.amazon.awssdk.services.sts.model.Credentials;
-public class S3Utils {
- private S3Utils() {
+public class S3AuthUtils {
+ private S3AuthUtils() {
throw new AssertionError("do not instantiate");
}
@@ -117,15 +108,17 @@ public class S3Utils {
* @return S3 client
* @throws CompilationException CompilationException
*/
- public static S3Client buildAwsS3Client(Map<String, String> configuration)
throws CompilationException {
+ public static S3Client buildAwsS3Client(IApplicationContext appCtx,
Map<String, String> configuration)
+ throws CompilationException {
String regionId = configuration.get(REGION_FIELD_NAME);
String serviceEndpoint =
configuration.get(SERVICE_END_POINT_FIELD_NAME);
Region region = validateAndGetRegion(regionId);
- AwsCredentialsProvider credentialsProvider =
buildCredentialsProvider(configuration);
+ AwsCredentialsProvider credentialsProvider =
buildCredentialsProvider(appCtx, configuration);
S3ClientBuilder builder = S3Client.builder();
builder.region(region);
+ builder.crossRegionAccessEnabled(true);
builder.credentialsProvider(credentialsProvider);
// Validate the service endpoint if present
@@ -146,8 +139,8 @@ public class S3Utils {
return builder.build();
}
- public static AwsCredentialsProvider buildCredentialsProvider(Map<String,
String> configuration)
- throws CompilationException {
+ public static AwsCredentialsProvider
buildCredentialsProvider(IApplicationContext appCtx,
+ Map<String, String> configuration) throws CompilationException {
String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
String instanceProfile =
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
@@ -157,12 +150,11 @@ public class S3Utils {
if (noAuth(configuration)) {
return AnonymousCredentialsProvider.create();
} else if (arnRole != null) {
- // TODO: Do auth validation and use existing credentials if exist
already, if not, assume the role
- return validateAndGetTrustAccountAuthentication(configuration);
+ return getTrustAccountCredentials(appCtx, configuration);
} else if (instanceProfile != null) {
- return validateAndGetInstanceProfileAuthentication(configuration);
+ return getInstanceProfileCredentials(configuration);
} else if (accessKeyId != null || secretAccessKey != null) {
- return validateAndGetAccessKeysAuthentications(configuration);
+ return getAccessKeyCredentials(configuration);
} else {
if (externalId != null) {
throw new
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
@@ -174,6 +166,166 @@ public class S3Utils {
}
}
+ public static Region validateAndGetRegion(String regionId) throws
CompilationException {
+ List<Region> regions = S3Client.serviceMetadata().regions();
+ Optional<Region> selectedRegion = regions.stream().filter(region ->
region.id().equals(regionId)).findFirst();
+
+ if (selectedRegion.isEmpty()) {
+ throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
+ }
+ return selectedRegion.get();
+ }
+
+ private static boolean noAuth(Map<String, String> configuration) {
+ return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME,
ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
+ ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
SESSION_TOKEN_FIELD_NAME) == null;
+ }
+
+ /**
+ * Returns the cached credentials if valid, otherwise, generates new
credentials by assume a role
+ *
+ * @param appCtx application context
+ * @param configuration configuration
+ * @return returns the cached credentials if valid, otherwise, generates
new credentials by assume a role
+ * @throws CompilationException CompilationException
+ */
+ public static AwsCredentialsProvider
getTrustAccountCredentials(IApplicationContext appCtx,
+ Map<String, String> configuration) throws CompilationException {
+ IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
+ Object credentialsObject = cache.getCredentials(configuration);
+ if (credentialsObject != null) {
+ return () -> (AwsSessionCredentials) credentialsObject;
+ }
+ IExternalCredentialsCacheUpdater cacheUpdater =
appCtx.getExternalCredentialsCacheUpdater();
+ AwsSessionCredentials credentials;
+ try {
+ credentials = (AwsSessionCredentials)
cacheUpdater.generateAndCacheCredentials(configuration);
+ } catch (HyracksDataException ex) {
+ throw new
CompilationException(ErrorCode.FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION,
ex, ex.getMessage());
+ }
+
+ return () -> credentials;
+ }
+
+ /**
+ * Assume role using provided credentials and return the new credentials
+ *
+ * @param configuration configuration
+ * @return return credentials from the assume role
+ * @throws CompilationException CompilationException
+ */
+ public static AwsCredentialsProvider
assumeRoleAndGetCredentials(Map<String, String> configuration)
+ throws CompilationException {
+ String regionId = configuration.get(REGION_FIELD_NAME);
+ String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
+ String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+ Region region = validateAndGetRegion(regionId);
+
+ AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
+ builder.roleArn(arnRole);
+ builder.roleSessionName(UUID.randomUUID().toString());
+ builder.durationSeconds(900); // TODO(htowaileb): configurable? Can be
900 to 43200 (15 mins to 12 hours)
+ if (externalId != null) {
+ builder.externalId(externalId);
+ }
+
+ // credentials to be used to assume the role
+ AwsCredentialsProvider credentialsProvider;
+ AssumeRoleRequest request = builder.build();
+ String instanceProfile =
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+ if ("true".equalsIgnoreCase(instanceProfile)) {
+ credentialsProvider = getInstanceProfileCredentials(configuration,
true);
+ } else if (accessKeyId != null && secretAccessKey != null) {
+ credentialsProvider = getAccessKeyCredentials(configuration, true);
+ } else {
+ throw new
CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
+ }
+
+ // assume the role from the provided arn
+ try (StsClient stsClient =
+
StsClient.builder().region(region).credentialsProvider(credentialsProvider).build())
{
+ AssumeRoleResponse response = stsClient.assumeRole(request);
+ Credentials credentials = response.credentials();
+ return
StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
+ credentials.secretAccessKey(),
credentials.sessionToken()));
+ } catch (SdkException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, getMessageOrToString(ex));
+ }
+ }
+
+ private static AwsCredentialsProvider
getInstanceProfileCredentials(Map<String, String> configuration)
+ throws CompilationException {
+ return getInstanceProfileCredentials(configuration, false);
+ }
+
+ private static AwsCredentialsProvider
getInstanceProfileCredentials(Map<String, String> configuration,
+ boolean assumeRoleAuthentication) throws CompilationException {
+ String instanceProfile =
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+
+ // only "true" value is allowed
+ if (!"true".equalsIgnoreCase(instanceProfile)) {
+ throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE,
INSTANCE_PROFILE_FIELD_NAME, "true");
+ }
+
+ if (!assumeRoleAuthentication) {
+ String notAllowed = getNonNull(configuration,
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
+ SESSION_TOKEN_FIELD_NAME);
+ if (notAllowed != null) {
+ throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+ INSTANCE_PROFILE_FIELD_NAME);
+ }
+ }
+ return InstanceProfileCredentialsProvider.create();
+ }
+
+ private static AwsCredentialsProvider getAccessKeyCredentials(Map<String,
String> configuration)
+ throws CompilationException {
+ return getAccessKeyCredentials(configuration, false);
+ }
+
+ private static AwsCredentialsProvider getAccessKeyCredentials(Map<String,
String> configuration,
+ boolean assumeRoleAuthentication) throws CompilationException {
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+ String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+
+ if (accessKeyId == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
ACCESS_KEY_ID_FIELD_NAME,
+ SECRET_ACCESS_KEY_FIELD_NAME);
+ }
+ if (secretAccessKey == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
SECRET_ACCESS_KEY_FIELD_NAME,
+ ACCESS_KEY_ID_FIELD_NAME);
+ }
+
+ if (!assumeRoleAuthentication) {
+ String notAllowed = getNonNull(configuration,
INSTANCE_PROFILE_FIELD_NAME, EXTERNAL_ID_FIELD_NAME);
+ if (notAllowed != null) {
+ throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+ INSTANCE_PROFILE_FIELD_NAME);
+ }
+ }
+
+ // use session token if provided
+ if (sessionToken != null) {
+ return StaticCredentialsProvider
+ .create(AwsSessionCredentials.create(accessKeyId,
secretAccessKey, sessionToken));
+ } else {
+ return
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId,
secretAccessKey));
+ }
+ }
+
+ private static String getNonNull(Map<String, String> configuration,
String... fieldNames) {
+ for (String fieldName : fieldNames) {
+ if (configuration.get(fieldName) != null) {
+ return fieldName;
+ }
+ }
+ return null;
+ }
+
/**
* Builds the S3 client using the provided configuration
*
@@ -235,8 +387,8 @@ public class S3Utils {
* @param configuration properties
* @throws CompilationException Compilation exception
*/
- public static void validateProperties(Map<String, String> configuration,
SourceLocation srcLoc,
- IWarningCollector collector) throws CompilationException {
+ public static void validateProperties(IApplicationContext appCtx,
Map<String, String> configuration,
+ SourceLocation srcLoc, IWarningCollector collector) throws
CompilationException {
if (isDeltaTable(configuration)) {
validateDeltaTableProperties(configuration);
}
@@ -251,12 +403,7 @@ public class S3Utils {
String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
if (arnRole != null) {
- String notAllowed = getNonNull(configuration,
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- if (notAllowed != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
+ return;
} else if (externalId != null) {
throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
ROLE_ARN_FIELD_NAME,
EXTERNAL_ID_FIELD_NAME);
@@ -280,21 +427,21 @@ public class S3Utils {
}
// Check if the bucket is present
- S3Client s3Client = buildAwsS3Client(configuration);
+ S3Client s3Client = buildAwsS3Client(appCtx, configuration);
S3Response response;
boolean useOldApi = false;
String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
String prefix = getPrefix(configuration);
try {
- response = isBucketEmpty(s3Client, container, prefix, false);
+ response = S3Utils.isBucketEmpty(s3Client, container, prefix,
false);
} catch (S3Exception ex) {
// Method not implemented, try falling back to old API
try {
// For error code, see
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
useOldApi = true;
- response = isBucketEmpty(s3Client, container, prefix,
true);
+ response = S3Utils.isBucketEmpty(s3Client, container,
prefix, true);
} else {
throw ex;
}
@@ -322,349 +469,4 @@ public class S3Utils {
throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
}
}
-
- /**
- * Checks for a single object in the specified bucket to determine if the
bucket is empty or not.
- *
- * @param s3Client s3 client
- * @param container the container name
- * @param prefix Prefix to be used
- * @param useOldApi flag whether to use the old API or not
- * @return returns the S3 response
- */
- private static S3Response isBucketEmpty(S3Client s3Client, String
container, String prefix, boolean useOldApi) {
- S3Response response;
- if (useOldApi) {
- ListObjectsRequest.Builder listObjectsBuilder =
ListObjectsRequest.builder();
- listObjectsBuilder.prefix(prefix);
- response =
s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build());
- } else {
- ListObjectsV2Request.Builder listObjectsBuilder =
ListObjectsV2Request.builder();
- listObjectsBuilder.prefix(prefix);
- response =
s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
- }
- return response;
- }
-
- /**
- * Returns the lists of S3 objects.
- *
- * @param configuration properties
- * @param includeExcludeMatcher include/exclude matchers to apply
- */
- public static List<S3Object> listS3Objects(Map<String, String>
configuration,
- AbstractExternalInputStreamFactory.IncludeExcludeMatcher
includeExcludeMatcher,
- IWarningCollector warningCollector, ExternalDataPrefix
externalDataPrefix,
- IExternalFilterEvaluator evaluator) throws CompilationException,
HyracksDataException {
- // Prepare to retrieve the objects
- List<S3Object> filesOnly;
- String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- S3Client s3Client = buildAwsS3Client(configuration);
- String prefix = getPrefix(configuration);
-
- try {
- filesOnly = listS3Objects(s3Client, container, prefix,
includeExcludeMatcher, externalDataPrefix, evaluator,
- warningCollector);
- } catch (S3Exception ex) {
- // New API is not implemented, try falling back to old API
- try {
- // For error code, see
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
- if
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
- filesOnly = oldApiListS3Objects(s3Client, container,
prefix, includeExcludeMatcher,
- externalDataPrefix, evaluator, warningCollector);
- } else {
- throw ex;
- }
- } catch (SdkException ex2) {
- throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2,
getMessageOrToString(ex));
- }
- } catch (SdkException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, getMessageOrToString(ex));
- } finally {
- if (s3Client != null) {
- CleanupUtils.close(s3Client, null);
- }
- }
-
- // Warn if no files are returned
- if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
- Warning warning = Warning.of(null,
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- warningCollector.warn(warning);
- }
-
- return filesOnly;
- }
-
- /**
- * Uses the latest API to retrieve the objects from the storage.
- *
- * @param s3Client S3 client
- * @param container container name
- * @param prefix definition prefix
- * @param includeExcludeMatcher include/exclude matchers to apply
- */
- private static List<S3Object> listS3Objects(S3Client s3Client, String
container, String prefix,
- AbstractExternalInputStreamFactory.IncludeExcludeMatcher
includeExcludeMatcher,
- ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator
evaluator,
- IWarningCollector warningCollector) throws HyracksDataException {
- String newMarker = null;
- List<S3Object> filesOnly = new ArrayList<>();
-
- ListObjectsV2Response listObjectsResponse;
- ListObjectsV2Request.Builder listObjectsBuilder =
ListObjectsV2Request.builder().bucket(container);
- listObjectsBuilder.prefix(prefix);
-
- while (true) {
- // List the objects from the start, or from the last marker in
case of truncated result
- if (newMarker == null) {
- listObjectsResponse =
s3Client.listObjectsV2(listObjectsBuilder.build());
- } else {
- listObjectsResponse =
s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
- }
-
- // Collect the paths to files only
- collectAndFilterFiles(listObjectsResponse.contents(),
includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly,
externalDataPrefix, evaluator,
- warningCollector);
-
- // Mark the flag as done if done, otherwise, get the marker of the
previous response for the next request
- if (listObjectsResponse.isTruncated() != null &&
listObjectsResponse.isTruncated()) {
- newMarker = listObjectsResponse.nextContinuationToken();
- } else {
- break;
- }
- }
-
- return filesOnly;
- }
-
- /**
- * Uses the old API (in case the new API is not implemented) to retrieve
the objects from the storage
- *
- * @param s3Client S3 client
- * @param container container name
- * @param prefix definition prefix
- * @param includeExcludeMatcher include/exclude matchers to apply
- */
- private static List<S3Object> oldApiListS3Objects(S3Client s3Client,
String container, String prefix,
- AbstractExternalInputStreamFactory.IncludeExcludeMatcher
includeExcludeMatcher,
- ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator
evaluator,
- IWarningCollector warningCollector) throws HyracksDataException {
- String newMarker = null;
- List<S3Object> filesOnly = new ArrayList<>();
-
- ListObjectsResponse listObjectsResponse;
- ListObjectsRequest.Builder listObjectsBuilder =
ListObjectsRequest.builder().bucket(container);
- listObjectsBuilder.prefix(prefix);
-
- while (true) {
- // List the objects from the start, or from the last marker in
case of truncated result
- if (newMarker == null) {
- listObjectsResponse =
s3Client.listObjects(listObjectsBuilder.build());
- } else {
- listObjectsResponse =
s3Client.listObjects(listObjectsBuilder.marker(newMarker).build());
- }
-
- // Collect the paths to files only
- collectAndFilterFiles(listObjectsResponse.contents(),
includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly,
externalDataPrefix, evaluator,
- warningCollector);
-
- // Mark the flag as done if done, otherwise, get the marker of the
previous response for the next request
- if (listObjectsResponse.isTruncated() != null &&
listObjectsResponse.isTruncated()) {
- newMarker = listObjectsResponse.nextMarker();
- } else {
- break;
- }
- }
-
- return filesOnly;
- }
-
- /**
- * Collects only files that pass all tests
- *
- * @param s3Objects s3 objects
- * @param predicate predicate
- * @param matchers matchers
- * @param filesOnly filtered files
- * @param externalDataPrefix external data prefix
- * @param evaluator evaluator
- */
- private static void collectAndFilterFiles(List<S3Object> s3Objects,
BiPredicate<List<Matcher>, String> predicate,
- List<Matcher> matchers, List<S3Object> filesOnly,
ExternalDataPrefix externalDataPrefix,
- IExternalFilterEvaluator evaluator, IWarningCollector
warningCollector) throws HyracksDataException {
- for (S3Object object : s3Objects) {
- if (ExternalDataUtils.evaluate(object.key(), predicate, matchers,
externalDataPrefix, evaluator,
- warningCollector)) {
- filesOnly.add(object);
- }
- }
- }
-
- public static Map<String, List<String>> S3ObjectsOfSingleDepth(Map<String,
String> configuration, String container,
- String prefix) throws CompilationException {
- // create s3 client
- S3Client s3Client = buildAwsS3Client(configuration);
- // fetch all the s3 objects
- return listS3ObjectsOfSingleDepth(s3Client, container, prefix);
- }
-
- /**
- * Uses the latest API to retrieve the objects from the storage of a
single level.
- *
- * @param s3Client S3 client
- * @param container container name
- * @param prefix definition prefix
- */
- private static Map<String, List<String>>
listS3ObjectsOfSingleDepth(S3Client s3Client, String container,
- String prefix) {
- Map<String, List<String>> allObjects = new HashMap<>();
- ListObjectsV2Iterable listObjectsInterable;
- ListObjectsV2Request.Builder listObjectsBuilder =
-
ListObjectsV2Request.builder().bucket(container).prefix(prefix).delimiter("/");
- listObjectsBuilder.prefix(prefix);
- List<String> files = new ArrayList<>();
- List<String> folders = new ArrayList<>();
- // to skip the prefix as a file from the response
- boolean checkPrefixInFile = true;
- listObjectsInterable =
s3Client.listObjectsV2Paginator(listObjectsBuilder.build());
- for (ListObjectsV2Response response : listObjectsInterable) {
- // put all the files
- for (S3Object object : response.contents()) {
- String fileName = object.key();
- fileName = fileName.substring(prefix.length(),
fileName.length());
- if (checkPrefixInFile) {
- if (prefix.equals(object.key()))
- checkPrefixInFile = false;
- else {
- files.add(fileName);
- }
- } else {
- files.add(fileName);
- }
- }
- // put all the folders
- for (CommonPrefix object : response.commonPrefixes()) {
- String folderName = object.prefix();
- folderName = folderName.substring(prefix.length(),
folderName.length());
- folders.add(folderName.endsWith("/") ? folderName.substring(0,
folderName.length() - 1) : folderName);
- }
- }
- allObjects.put("files", files);
- allObjects.put("folders", folders);
- return allObjects;
- }
-
- public static Region validateAndGetRegion(String regionId) throws
CompilationException {
- List<Region> regions = S3Client.serviceMetadata().regions();
- Optional<Region> selectedRegion = regions.stream().filter(region ->
region.id().equals(regionId)).findFirst();
-
- if (selectedRegion.isEmpty()) {
- throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
- }
- return selectedRegion.get();
- }
-
- // TODO(htowaileb): Currently, trust-account is always assuming we have
instance profile setup in place
- private static AwsCredentialsProvider
validateAndGetTrustAccountAuthentication(Map<String, String> configuration)
- throws CompilationException {
- String notAllowed = getNonNull(configuration,
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- if (notAllowed != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
-
- String regionId = configuration.get(REGION_FIELD_NAME);
- String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
- String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
- Region region = validateAndGetRegion(regionId);
-
- AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
- builder.roleArn(arnRole);
- builder.roleSessionName(UUID.randomUUID().toString());
- builder.durationSeconds(900); // minimum role assume duration = 900
seconds (15 minutes), make configurable?
- if (externalId != null) {
- builder.externalId(externalId);
- }
- AssumeRoleRequest request = builder.build();
- AwsCredentialsProvider credentialsProvider =
validateAndGetInstanceProfileAuthentication(configuration);
-
- // TODO(htowaileb): We shouldn't assume role with each request, rather
stored the received temporary credentials
- // and refresh when expired.
- // assume the role from the provided arn
- try (StsClient stsClient =
-
StsClient.builder().region(region).credentialsProvider(credentialsProvider).build())
{
- AssumeRoleResponse response = stsClient.assumeRole(request);
- Credentials credentials = response.credentials();
- return
StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
- credentials.secretAccessKey(),
credentials.sessionToken()));
- } catch (SdkException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, getMessageOrToString(ex));
- }
- }
-
- private static AwsCredentialsProvider
validateAndGetInstanceProfileAuthentication(Map<String, String> configuration)
- throws CompilationException {
- String instanceProfile =
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-
- // only "true" value is allowed
- if (!"true".equalsIgnoreCase(instanceProfile)) {
- throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE,
INSTANCE_PROFILE_FIELD_NAME, "true");
- }
-
- String notAllowed = getNonNull(configuration,
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- if (notAllowed != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
- return InstanceProfileCredentialsProvider.create();
- }
-
- private static AwsCredentialsProvider
validateAndGetAccessKeysAuthentications(Map<String, String> configuration)
- throws CompilationException {
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
-
- // accessKeyId authentication
- if (accessKeyId == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
ACCESS_KEY_ID_FIELD_NAME,
- SECRET_ACCESS_KEY_FIELD_NAME);
- }
- if (secretAccessKey == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
SECRET_ACCESS_KEY_FIELD_NAME,
- ACCESS_KEY_ID_FIELD_NAME);
- }
-
- String notAllowed = getNonNull(configuration, EXTERNAL_ID_FIELD_NAME);
- if (notAllowed != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
-
- // use session token if provided
- if (sessionToken != null) {
- return StaticCredentialsProvider
- .create(AwsSessionCredentials.create(accessKeyId,
secretAccessKey, sessionToken));
- } else {
- return
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId,
secretAccessKey));
- }
- }
-
- private static boolean noAuth(Map<String, String> configuration) {
- return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME,
ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
- ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
SESSION_TOKEN_FIELD_NAME) == null;
- }
-
- private static String getNonNull(Map<String, String> configuration,
String... fieldNames) {
- for (String fieldName : fieldNames) {
- if (configuration.get(fieldName) != null) {
- return fieldName;
- }
- }
- return null;
- }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 3cfccb47e2..a2b50e1fc0 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -18,48 +18,18 @@
*/
package org.apache.asterix.external.util.aws.s3;
-import static
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
-import static
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
-import static
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
-import static
org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
-import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
-import static
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
-import static
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.EXTERNAL_ID_FIELD_NAME;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS_ACCESS;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_PROTOCOL;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMP_ACCESS;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.INSTANCE_PROFILE_FIELD_NAME;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.REGION_FIELD_NAME;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.ROLE_ARN_FIELD_NAME;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.SESSION_TOKEN_FIELD_NAME;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -67,26 +37,13 @@ import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.api.util.CleanupUtils;
-import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.CommonPrefix;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
@@ -96,233 +53,12 @@ import
software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.S3Response;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
-import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
-import software.amazon.awssdk.services.sts.model.Credentials;
public class S3Utils {
private S3Utils() {
throw new AssertionError("do not instantiate");
}
- public static boolean isRetryableError(String errorCode) {
- return errorCode.equals(ERROR_INTERNAL_ERROR) ||
errorCode.equals(ERROR_SLOW_DOWN);
- }
-
- /**
- * Builds the S3 client using the provided configuration
- *
- * @param configuration properties
- * @return S3 client
- * @throws CompilationException CompilationException
- */
- public static S3Client buildAwsS3Client(Map<String, String> configuration)
throws CompilationException {
- String regionId = configuration.get(REGION_FIELD_NAME);
- String serviceEndpoint =
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
- Region region = validateAndGetRegion(regionId);
- AwsCredentialsProvider credentialsProvider =
buildCredentialsProvider(configuration);
-
- S3ClientBuilder builder = S3Client.builder();
- builder.region(region);
- builder.credentialsProvider(credentialsProvider);
-
- // Validate the service endpoint if present
- if (serviceEndpoint != null) {
- try {
- URI uri = new URI(serviceEndpoint);
- try {
- builder.endpointOverride(uri);
- } catch (NullPointerException ex) {
- throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
getMessageOrToString(ex));
- }
- } catch (URISyntaxException ex) {
- throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
- String.format("Invalid service endpoint %s",
serviceEndpoint));
- }
- }
-
- return builder.build();
- }
-
- public static AwsCredentialsProvider buildCredentialsProvider(Map<String,
String> configuration)
- throws CompilationException {
- String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
- String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
- String instanceProfile =
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
- if (noAuth(configuration)) {
- return AnonymousCredentialsProvider.create();
- } else if (arnRole != null) {
- // TODO: Do auth validation and use existing credentials if exist
already, if not, assume the role
- return validateAndGetTrustAccountAuthentication(configuration);
- } else if (instanceProfile != null) {
- return validateAndGetInstanceProfileAuthentication(configuration);
- } else if (accessKeyId != null || secretAccessKey != null) {
- return validateAndGetAccessKeysAuthentications(configuration);
- } else {
- if (externalId != null) {
- throw new
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
- EXTERNAL_ID_FIELD_NAME);
- } else {
- throw new
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
ACCESS_KEY_ID_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- }
- }
- }
-
- /**
- * Builds the S3 client using the provided configuration
- *
- * @param configuration properties
- * @param numberOfPartitions number of partitions in the cluster
- */
- public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String,
String> configuration,
- int numberOfPartitions) {
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
- String serviceEndpoint =
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
- //Disable caching S3 FileSystem
- HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
-
- /*
- * Authentication Methods:
- * 1- Anonymous: no accessKeyId and no secretAccessKey
- * 2- Temporary: has to provide accessKeyId, secretAccessKey and
sessionToken
- * 3- Private: has to provide accessKeyId and secretAccessKey
- */
- if (accessKeyId == null) {
- //Tells hadoop-aws it is an anonymous access
- conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
- } else {
- conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
- conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
- if (sessionToken != null) {
- conf.set(HADOOP_SESSION_TOKEN, sessionToken);
- //Tells hadoop-aws it is a temporary access
- conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
- }
- }
-
- /*
- * This is to allow S3 definition to have path-style form. Should
always be true to match the current
- * way we access files in S3
- */
- conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
-
- /*
- * Set the size of S3 connection pool to be the number of partitions
- */
- conf.set(HADOOP_S3_CONNECTION_POOL_SIZE,
String.valueOf(numberOfPartitions));
-
- if (serviceEndpoint != null) {
- // Validation of the URL should be done at hadoop-aws level
- conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
- } else {
- //Region is ignored and buckets could be found by the central
endpoint
- conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
- }
- }
-
- /**
- * Validate external dataset properties
- *
- * @param configuration properties
- * @throws CompilationException Compilation exception
- */
- public static void validateProperties(Map<String, String> configuration,
SourceLocation srcLoc,
- IWarningCollector collector) throws CompilationException {
- if (isDeltaTable(configuration)) {
- validateDeltaTableProperties(configuration);
- }
- // check if the format property is present
- else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
srcLoc, ExternalDataConstants.KEY_FORMAT);
- }
-
- String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
- String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
- if (arnRole != null) {
- String notAllowed = getNonNull(configuration,
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- if (notAllowed != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
- } else if (externalId != null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
ROLE_ARN_FIELD_NAME,
- EXTERNAL_ID_FIELD_NAME);
- } else if (accessKeyId == null || secretAccessKey == null) {
- // If one is passed, the other is required
- if (accessKeyId != null) {
- throw new
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
SECRET_ACCESS_KEY_FIELD_NAME,
- ACCESS_KEY_ID_FIELD_NAME);
- } else if (secretAccessKey != null) {
- throw new
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
ACCESS_KEY_ID_FIELD_NAME,
- SECRET_ACCESS_KEY_FIELD_NAME);
- }
- }
-
- validateIncludeExclude(configuration);
- try {
- // TODO(htowaileb): maybe something better, this will check to
ensure type is supported before creation
- new ExternalDataPrefix(configuration);
- } catch (AlgebricksException ex) {
- throw new
CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
- }
-
- // Check if the bucket is present
- S3Client s3Client = buildAwsS3Client(configuration);
- S3Response response;
- boolean useOldApi = false;
- String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- String prefix = getPrefix(configuration);
-
- try {
- response = isBucketEmpty(s3Client, container, prefix, false);
- } catch (S3Exception ex) {
- // Method not implemented, try falling back to old API
- try {
- // For error code, see
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
- if
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
- useOldApi = true;
- response = isBucketEmpty(s3Client, container, prefix,
true);
- } else {
- throw ex;
- }
- } catch (SdkException ex2) {
- throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2,
getMessageOrToString(ex));
- }
- } catch (SdkException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, getMessageOrToString(ex));
- } finally {
- if (s3Client != null) {
- CleanupUtils.close(s3Client, null);
- }
- }
-
- boolean isEmpty = useOldApi ? ((ListObjectsResponse)
response).contents().isEmpty()
- : ((ListObjectsV2Response) response).contents().isEmpty();
- if (isEmpty && collector.shouldWarn()) {
- Warning warning = Warning.of(srcLoc,
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- collector.warn(warning);
- }
-
- // Returns 200 only in case the bucket exists, otherwise, throws an
exception. However, to
- // ensure coverage, check if the result is successful as well and not
only catch exceptions
- if (!response.sdkHttpResponse().isSuccessful()) {
- throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
- }
- }
-
/**
* Checks for a single object in the specified bucket to determine if the
bucket is empty or not.
*
@@ -332,7 +68,7 @@ public class S3Utils {
* @param useOldApi flag whether to use the old API or not
* @return returns the S3 response
*/
- private static S3Response isBucketEmpty(S3Client s3Client, String
container, String prefix, boolean useOldApi) {
+ protected static S3Response isBucketEmpty(S3Client s3Client, String
container, String prefix, boolean useOldApi) {
S3Response response;
if (useOldApi) {
ListObjectsRequest.Builder listObjectsBuilder =
ListObjectsRequest.builder();
@@ -352,14 +88,14 @@ public class S3Utils {
* @param configuration properties
* @param includeExcludeMatcher include/exclude matchers to apply
*/
- public static List<S3Object> listS3Objects(Map<String, String>
configuration,
+ public static List<S3Object> listS3Objects(IApplicationContext appCtx,
Map<String, String> configuration,
AbstractExternalInputStreamFactory.IncludeExcludeMatcher
includeExcludeMatcher,
IWarningCollector warningCollector, ExternalDataPrefix
externalDataPrefix,
IExternalFilterEvaluator evaluator) throws CompilationException,
HyracksDataException {
// Prepare to retrieve the objects
List<S3Object> filesOnly;
String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- S3Client s3Client = buildAwsS3Client(configuration);
+ S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx,
configuration);
String prefix = getPrefix(configuration);
try {
@@ -502,10 +238,11 @@ public class S3Utils {
}
}
- public static Map<String, List<String>> S3ObjectsOfSingleDepth(Map<String,
String> configuration, String container,
- String prefix) throws CompilationException {
+ public static Map<String, List<String>>
S3ObjectsOfSingleDepth(IApplicationContext appCtx,
+ Map<String, String> configuration, String container, String prefix)
+ throws CompilationException, HyracksDataException {
// create s3 client
- S3Client s3Client = buildAwsS3Client(configuration);
+ S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx,
configuration);
// fetch all the s3 objects
return listS3ObjectsOfSingleDepth(s3Client, container, prefix);
}
@@ -555,116 +292,4 @@ public class S3Utils {
allObjects.put("folders", folders);
return allObjects;
}
-
- public static Region validateAndGetRegion(String regionId) throws
CompilationException {
- List<Region> regions = S3Client.serviceMetadata().regions();
- Optional<Region> selectedRegion = regions.stream().filter(region ->
region.id().equals(regionId)).findFirst();
-
- if (selectedRegion.isEmpty()) {
- throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
- }
- return selectedRegion.get();
- }
-
- // TODO(htowaileb): Currently, trust-account is always assuming we have
instance profile setup in place
- private static AwsCredentialsProvider
validateAndGetTrustAccountAuthentication(Map<String, String> configuration)
- throws CompilationException {
- String notAllowed = getNonNull(configuration,
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- if (notAllowed != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
-
- String regionId = configuration.get(REGION_FIELD_NAME);
- String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
- String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
- Region region = validateAndGetRegion(regionId);
-
- AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
- builder.roleArn(arnRole);
- builder.roleSessionName(UUID.randomUUID().toString());
- builder.durationSeconds(900); // minimum role assume duration = 900
seconds (15 minutes), make configurable?
- if (externalId != null) {
- builder.externalId(externalId);
- }
- AssumeRoleRequest request = builder.build();
- AwsCredentialsProvider credentialsProvider =
validateAndGetInstanceProfileAuthentication(configuration);
-
- // TODO(htowaileb): We shouldn't assume role with each request, rather
stored the received temporary credentials
- // and refresh when expired.
- // assume the role from the provided arn
- try (StsClient stsClient =
-
StsClient.builder().region(region).credentialsProvider(credentialsProvider).build())
{
- AssumeRoleResponse response = stsClient.assumeRole(request);
- Credentials credentials = response.credentials();
- return
StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
- credentials.secretAccessKey(),
credentials.sessionToken()));
- } catch (SdkException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, getMessageOrToString(ex));
- }
- }
-
- private static AwsCredentialsProvider
validateAndGetInstanceProfileAuthentication(Map<String, String> configuration)
- throws CompilationException {
- String instanceProfile =
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-
- // only "true" value is allowed
- if (!"true".equalsIgnoreCase(instanceProfile)) {
- throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE,
INSTANCE_PROFILE_FIELD_NAME, "true");
- }
-
- String notAllowed = getNonNull(configuration,
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- if (notAllowed != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
- return InstanceProfileCredentialsProvider.create();
- }
-
- private static AwsCredentialsProvider
validateAndGetAccessKeysAuthentications(Map<String, String> configuration)
- throws CompilationException {
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
-
- // accessKeyId authentication
- if (accessKeyId == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
ACCESS_KEY_ID_FIELD_NAME,
- SECRET_ACCESS_KEY_FIELD_NAME);
- }
- if (secretAccessKey == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
SECRET_ACCESS_KEY_FIELD_NAME,
- ACCESS_KEY_ID_FIELD_NAME);
- }
-
- String notAllowed = getNonNull(configuration, EXTERNAL_ID_FIELD_NAME);
- if (notAllowed != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
-
- // use session token if provided
- if (sessionToken != null) {
- return StaticCredentialsProvider
- .create(AwsSessionCredentials.create(accessKeyId,
secretAccessKey, sessionToken));
- } else {
- return
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId,
secretAccessKey));
- }
- }
-
- private static boolean noAuth(Map<String, String> configuration) {
- return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME,
ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
- ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
SESSION_TOKEN_FIELD_NAME) == null;
- }
-
- private static String getNonNull(Map<String, String> configuration,
String... fieldNames) {
- for (String fieldName : fieldNames) {
- if (configuration.get(fieldName) != null) {
- return fieldName;
- }
- }
- return null;
- }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
index dc20a89420..a4113d125d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -26,6 +26,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.UUID;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -123,7 +124,7 @@ public class HDFSExternalFileWriterFactory implements
IExternalFileWriterFactory
}
@Override
- public void validate() throws AlgebricksException {
+ public void validate(IApplicationContext appCtx) throws
AlgebricksException {
Configuration conf = HDFSUtils.configureHDFSwrite(configuration);
credentials = HDFSUtils.configureHadoopAuthentication(configuration,
conf);
try {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
index bdefaa85b2..b1d3a956d8 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -20,6 +20,7 @@ package org.apache.asterix.external.writer;
import java.io.File;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -80,7 +81,7 @@ public final class LocalFSExternalFileWriterFactory
implements IExternalFileWrit
}
@Override
- public void validate() throws AlgebricksException {
+ public void validate(IApplicationContext appCtx) throws
AlgebricksException {
// A special case validation for a single node cluster
if (singleNodeCluster && staticPath != null) {
if (isNonEmptyDirectory(new File(staticPath))) {
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index c1528532e9..ef056c958e 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -994,6 +994,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
Map<String, String> configuration, ARecordType itemType,
IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
try {
+ configuration.put(ExternalDataConstants.KEY_DATASET,
dataset.getDatasetName());
configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE,
dataset.getDatabaseName());
configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
dataset.getDataverseName().getCanonicalForm());
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 1caea58ce7..e6716dff29 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -148,7 +148,7 @@ public class ExternalWriterProvider {
String staticPath = staticPathExpr != null ?
ConstantExpressionUtil.getStringConstant(staticPathExpr) : null;
IExternalFileWriterFactory fileWriterFactory =
ExternalWriterProvider.createWriterFactory(appCtx, sink,
staticPath, pathSourceLocation);
- fileWriterFactory.validate();
+ fileWriterFactory.validate(appCtx);
String fileExtension = ExternalWriterProvider.getFileExtension(sink);
int maxResult = ExternalWriterProvider.getMaxResult(sink);
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
index 4a75db6906..2aeca4fe9e 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
@@ -18,11 +18,12 @@
*/
package org.apache.asterix.runtime.writer;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
public interface IExternalWriterFactoryValidator {
/**
* Perform the necessary validation to ensure the writer has the proper
permissions
*/
- void validate() throws AlgebricksException;
+ void validate(IApplicationContext appCtx) throws AlgebricksException;
}