This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit b9d29911b46f2c17420a7d6438a9946133f89501 Author: Hussain Towaileb <[email protected]> AuthorDate: Sun Oct 5 15:23:01 2025 +0300 [NO ISSUE]: Refactor AWS common properties In an anticipation of incoming other AWS clients, factor out the common AWS properties to a different class outside of S3-specific classes. Ext-ref: MB-68835 Change-Id: I4d2a50d361ad949d4d3aeee3208bbd8dbb3e83a2 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20463 Tested-by: Hussain Towaileb <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Integration-Tests: Jenkins <[email protected]> --- .../external/ExternalCredentialsCacheUpdater.java | 4 +- .../org/apache/asterix/utils/RedactionUtil.java | 2 +- .../cloud/clients/aws/s3/S3ClientConfig.java | 4 +- .../cloud/writer/S3ExternalFileWriterFactory.java | 4 +- .../input/record/reader/aws/AwsS3InputStream.java | 9 +- .../reader/aws/delta/AwsS3DeltaReaderFactory.java | 2 +- .../aws/parquet/AwsS3ParquetReaderFactory.java | 2 +- .../asterix/external/util/ExternalDataUtils.java | 11 +- .../asterix/external/util/aws/AwsConstants.java | 42 ++ .../apache/asterix/external/util/aws/AwsUtils.java | 286 ++++++++++ .../asterix/external/util/aws/s3/S3AuthUtils.java | 577 --------------------- .../asterix/external/util/aws/s3/S3Constants.java | 16 - .../asterix/external/util/aws/s3/S3Utils.java | 317 ++++++++++- .../input/record/reader/awss3/AwsS3Test.java | 4 +- 14 files changed, 657 insertions(+), 623 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java index 10f536bb71..a9541260c9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java @@ -41,7 +41,7 @@ import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.aws.s3.S3AuthUtils; +import org.apache.asterix.external.util.aws.AwsUtils; import org.apache.asterix.messaging.CCMessageBroker; import org.apache.asterix.messaging.NCMessageBroker; import org.apache.hyracks.api.application.INCServiceContext; @@ -94,7 +94,7 @@ public class ExternalCredentialsCacheUpdater implements IExternalCredentialsCach validateClusterState(); try { LOGGER.info("attempting to update AWS credentials for {}", key); - AwsCredentialsProvider newCredentials = S3AuthUtils.assumeRoleAndGetCredentials(configuration); + AwsCredentialsProvider newCredentials = AwsUtils.assumeRoleAndGetCredentials(configuration); LOGGER.info("updated AWS credentials successfully for {}", key); credentials = (AwsSessionCredentials) newCredentials.resolveCredentials(); appCtx.getExternalCredentialsCache().put(key, credentials); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java index d8b47b88b5..8ef2882fcb 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java @@ -20,7 +20,7 @@ package org.apache.asterix.utils; import static java.util.regex.Pattern.CASE_INSENSITIVE; import static java.util.regex.Pattern.DOTALL; -import static org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.SECRET_ACCESS_KEY_FIELD_NAME; import java.util.regex.Pattern; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java index 025af0809f..f05a0e5820 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java @@ -22,7 +22,7 @@ import java.util.Map; import java.util.Objects; import org.apache.asterix.common.config.CloudProperties; -import org.apache.asterix.external.util.aws.s3.S3Constants; +import org.apache.asterix.external.util.aws.AwsConstants; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -90,7 +90,7 @@ public final class S3ClientConfig { public static S3ClientConfig of(Map<String, String> configuration, int writeBufferSize) { // Used to determine local vs. actual S3 - String endPoint = configuration.getOrDefault(S3Constants.SERVICE_END_POINT_FIELD_NAME, ""); + String endPoint = configuration.getOrDefault(AwsConstants.SERVICE_END_POINT_FIELD_NAME, ""); // Disabled long profilerLogInterval = 0; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java index b3747468cf..2d618149c1 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java @@ -27,8 +27,8 @@ import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.aws.s3.S3AuthUtils; import org.apache.asterix.external.util.aws.s3.S3Constants; +import org.apache.asterix.external.util.aws.s3.S3Utils; import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration; import org.apache.asterix.runtime.writer.IExternalFileWriter; import org.apache.asterix.runtime.writer.IExternalFileWriterFactory; @@ -67,7 +67,7 @@ public final class S3ExternalFileWriterFactory extends AbstractCloudExternalFile @Override ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException { S3ClientConfig config = S3ClientConfig.of(configuration, writeBufferSize); - return new S3CloudClient(config, S3AuthUtils.buildAwsS3Client(appCtx, configuration), + return new S3CloudClient(config, S3Utils.buildClient(appCtx, configuration), ICloudGuardian.NoOpCloudGuardian.INSTANCE); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java index 60205ece05..bea096671c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@ -35,7 +35,8 @@ import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmb import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream; import org.apache.asterix.external.input.record.reader.stream.AvailableInputStream; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.aws.s3.S3AuthUtils; +import org.apache.asterix.external.util.aws.AwsUtils; +import org.apache.asterix.external.util.aws.s3.S3Utils; import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.CleanupUtils; @@ -96,7 +97,7 @@ public class AwsS3InputStream extends AbstractExternalInputStream { LOGGER.debug(() -> "Key " + userData(request.key()) + " was not found in bucket {}" + request.bucket()); return false; } catch (S3Exception ex) { - if (S3AuthUtils.isArnAssumedRoleExpiredToken(configuration, ex.awsErrorDetails().errorCode())) { + if (AwsUtils.isArnAssumedRoleExpiredToken(configuration, ex.awsErrorDetails().errorCode())) { LOGGER.debug(() -> "Expired AWS assume role session, will attempt to refresh the session"); rebuildAwsS3Client(configuration); LOGGER.debug(() -> "Successfully refreshed AWS assume role session"); @@ -120,7 +121,7 @@ public class AwsS3InputStream extends AbstractExternalInputStream { } private boolean shouldRetry(String errorCode, int currentRetry) { - return currentRetry < MAX_RETRIES && S3AuthUtils.isRetryableError(errorCode); + return currentRetry < MAX_RETRIES && S3Utils.isRetryableError(errorCode); } @Override @@ -148,7 +149,7 @@ public class AwsS3InputStream extends AbstractExternalInputStream { private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException { try { - return S3AuthUtils.buildAwsS3Client(ncAppCtx, configuration); + return S3Utils.buildClient(ncAppCtx, configuration); } catch (CompilationException ex) { throw HyracksDataException.create(ex); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java index 7d678a9e46..51373a1d94 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.external.input.record.reader.aws.delta; -import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf; +import static org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf; import java.util.Collections; import java.util.List; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java index f5dc7a2eb6..833cba541e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.external.input.record.reader.aws.parquet; -import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf; +import static org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf; import static org.apache.asterix.external.util.aws.s3.S3Utils.listS3Objects; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 9926e09e4c..ea7b9c5a68 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -32,7 +32,7 @@ import static org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PATH; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START; -import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf; +import static org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf; import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties; import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties; import static org.apache.asterix.external.util.google.gcs.GCSAuthUtils.configureHdfsJobConf; @@ -81,7 +81,7 @@ import org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna import org.apache.asterix.external.library.JavaLibrary; import org.apache.asterix.external.library.msgpack.MessagePackUtils; import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions; -import org.apache.asterix.external.util.aws.s3.S3AuthUtils; +import org.apache.asterix.external.util.aws.AwsConstants; import org.apache.asterix.external.util.aws.s3.S3Constants; import org.apache.asterix.external.util.aws.s3.S3Utils; import org.apache.asterix.external.util.azure.blob_storage.AzureConstants; @@ -603,8 +603,9 @@ public class ExternalDataUtils { // If the table is in S3 if (configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) { - conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME)); - conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME)); + conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(AwsConstants.ACCESS_KEY_ID_FIELD_NAME)); + conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, + configuration.get(AwsConstants.SECRET_ACCESS_KEY_FIELD_NAME)); tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://" + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); @@ -728,7 +729,7 @@ public class ExternalDataUtils { switch (type) { case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3: - S3AuthUtils.validateProperties(appCtx, configuration, srcLoc, collector); + S3Utils.validateProperties(appCtx, configuration, srcLoc, collector); break; case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB: validateAzureBlobProperties(configuration, srcLoc, collector, appCtx); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsConstants.java new file mode 100644 index 0000000000..7868bdf2ca --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsConstants.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util.aws; + +public class AwsConstants { + private AwsConstants() { + throw new AssertionError("do not instantiate"); + } + + // Authentication specific parameters + public static final String REGION_FIELD_NAME = "region"; + public static final String CROSS_REGION_FIELD_NAME = "crossRegion"; + public static final String INSTANCE_PROFILE_FIELD_NAME = "instanceProfile"; + public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId"; + public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey"; + public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken"; + public static final String ROLE_ARN_FIELD_NAME = "roleArn"; + public static final String EXTERNAL_ID_FIELD_NAME = "externalId"; + public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint"; + + // AWS specific error codes + public static final String ERROR_INTERNAL_ERROR = "InternalError"; + public static final String ERROR_SLOW_DOWN = "SlowDown"; + public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented"; + public static final String ERROR_EXPIRED_TOKEN = "ExpiredToken"; +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java new file mode 100644 index 0000000000..a648853a46 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util.aws; + +import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE; +import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT; +import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT; +import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED; +import static org.apache.asterix.external.util.aws.AwsConstants.ACCESS_KEY_ID_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.CROSS_REGION_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.ERROR_EXPIRED_TOKEN; +import static org.apache.asterix.external.util.aws.AwsConstants.EXTERNAL_ID_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.INSTANCE_PROFILE_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.ROLE_ARN_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.SECRET_ACCESS_KEY_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.SESSION_TOKEN_FIELD_NAME; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.external.IExternalCredentialsCache; +import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; +import software.amazon.awssdk.services.sts.model.Credentials; + +public class AwsUtils { + + public enum AuthenticationType { + ANONYMOUS, + ARN_ASSUME_ROLE, + INSTANCE_PROFILE, + ACCESS_KEYS, + BAD_AUTHENTICATION + } + + private AwsUtils() { + throw new AssertionError("do not instantiate"); + } + + public static boolean isArnAssumedRoleExpiredToken(Map<String, String> configuration, String errorCode) { + return ERROR_EXPIRED_TOKEN.equals(errorCode) + && getAuthenticationType(configuration) == AuthenticationType.ARN_ASSUME_ROLE; + } + + public static AwsCredentialsProvider buildCredentialsProvider(IApplicationContext appCtx, + Map<String, String> configuration) throws CompilationException { + AuthenticationType authenticationType = getAuthenticationType(configuration); + switch (authenticationType) { + case ANONYMOUS: + return AnonymousCredentialsProvider.create(); + case ARN_ASSUME_ROLE: + return getTrustAccountCredentials(appCtx, configuration); + case INSTANCE_PROFILE: + return getInstanceProfileCredentials(configuration); + case ACCESS_KEYS: + return getAccessKeyCredentials(configuration); + default: + // missing required creds, report correct error message + String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME); + if (externalId != null) { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME, + EXTERNAL_ID_FIELD_NAME); + } else { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME, + SESSION_TOKEN_FIELD_NAME); + } + } + } + + public static Region validateAndGetRegion(String regionId) throws CompilationException { + List<Region> regions = S3Client.serviceMetadata().regions(); + Optional<Region> selectedRegion = regions.stream().filter(region -> region.id().equals(regionId)).findFirst(); + + if (selectedRegion.isEmpty()) { + throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId); + } + return selectedRegion.get(); + } + + public static AuthenticationType getAuthenticationType(Map<String, String> configuration) { + String roleArn = configuration.get(ROLE_ARN_FIELD_NAME); + String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME); + String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); + String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); + + if (noAuth(configuration)) { + return AuthenticationType.ANONYMOUS; + } else if (roleArn != null) { + return AuthenticationType.ARN_ASSUME_ROLE; + } else if (instanceProfile != null) { + return AuthenticationType.INSTANCE_PROFILE; + } else if (accessKeyId != null || secretAccessKey != null) { + return AuthenticationType.ACCESS_KEYS; + } else { + return AuthenticationType.BAD_AUTHENTICATION; + } + } + + public static boolean validateAndGetCrossRegion(String crossRegion) throws CompilationException { + if (crossRegion == null) { + return false; + } + if (!"true".equalsIgnoreCase(crossRegion) && !"false".equalsIgnoreCase(crossRegion)) { + throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, CROSS_REGION_FIELD_NAME, "true, false"); + } + return Boolean.parseBoolean(crossRegion); + } + + private static boolean noAuth(Map<String, String> configuration) { + return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME, + ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, SESSION_TOKEN_FIELD_NAME) == null; + } + + /** + * Returns the cached credentials if valid, otherwise, generates new credentials + * + * @param appCtx application context + * @param configuration configuration + * @return returns the cached credentials if valid, otherwise, generates new credentials + * @throws CompilationException CompilationException + */ + public static AwsCredentialsProvider getTrustAccountCredentials(IApplicationContext appCtx, + Map<String, String> configuration) throws CompilationException { + IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache(); + Object credentialsObject = cache.get(configuration.get(ExternalDataConstants.KEY_ENTITY_ID)); + if (credentialsObject != null) { + return () -> (AwsSessionCredentials) credentialsObject; + } + IExternalCredentialsCacheUpdater cacheUpdater = appCtx.getExternalCredentialsCacheUpdater(); + AwsSessionCredentials credentials; + try { + credentials = (AwsSessionCredentials) cacheUpdater.generateAndCacheCredentials(configuration); + } catch (HyracksDataException ex) { + throw new CompilationException(ErrorCode.FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION, ex, ex.getMessage()); + } + + return () -> credentials; + } + + /** + * Assume role using provided credentials and return the new credentials + * + * @param configuration configuration + * @return return credentials from the assume role + * @throws CompilationException CompilationException + */ + public static AwsCredentialsProvider assumeRoleAndGetCredentials(Map<String, String> configuration) + throws CompilationException { + String regionId = configuration.get(REGION_FIELD_NAME); + String arnRole = configuration.get(ROLE_ARN_FIELD_NAME); + String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME); + Region region = validateAndGetRegion(regionId); + + AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder(); + builder.roleArn(arnRole); + builder.roleSessionName(UUID.randomUUID().toString()); + builder.durationSeconds(900); // TODO(htowaileb): configurable? Can be 900 to 43200 (15 mins to 12 hours) + if (externalId != null) { + builder.externalId(externalId); + } + AssumeRoleRequest request = builder.build(); + AwsCredentialsProvider credentialsProvider = getCredentialsToAssumeRole(configuration); + + // assume the role from the provided arn + try (StsClient stsClient = + StsClient.builder().region(region).credentialsProvider(credentialsProvider).build()) { + AssumeRoleResponse response = stsClient.assumeRole(request); + Credentials credentials = response.credentials(); + return StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(), + credentials.secretAccessKey(), credentials.sessionToken())); + } catch (SdkException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); + } + } + + private static AwsCredentialsProvider getCredentialsToAssumeRole(Map<String, String> configuration) + throws CompilationException { + String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME); + String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); + String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); + if (instanceProfile != null) { + return getInstanceProfileCredentials(configuration); + } else if (accessKeyId != null || secretAccessKey != null) { + return getAccessKeyCredentials(configuration); + } else { + throw new CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION); + } + } + + private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration) + throws CompilationException { + String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME); + + // only "true" value is allowed + if (!"true".equalsIgnoreCase(instanceProfile)) { + throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INSTANCE_PROFILE_FIELD_NAME, "true"); + } + + String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, + SESSION_TOKEN_FIELD_NAME); + if (notAllowed != null) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, + INSTANCE_PROFILE_FIELD_NAME); + } + return InstanceProfileCredentialsProvider.create(); + } + + private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration) + throws CompilationException { + String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); + String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); + String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME); + + if (accessKeyId == null) { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME, + SECRET_ACCESS_KEY_FIELD_NAME); + } + if (secretAccessKey == null) { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME, + ACCESS_KEY_ID_FIELD_NAME); + } + + // use session token if provided + if (sessionToken != null) { + return StaticCredentialsProvider + .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken)); + } else { + return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey)); + } + } + + private static String getNonNull(Map<String, String> configuration, String... fieldNames) { + for (String fieldName : fieldNames) { + if (configuration.get(fieldName) != null) { + return fieldName; + } + } + return null; + } + + /** + * Generates a random external ID to be used in cross-account role assumption. + * + * @return external id + */ + public static String generateExternalId() { + return UUID.randomUUID().toString(); + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java deleted file mode 100644 index 90bb04f309..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java +++ /dev/null @@ -1,577 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.util.aws.s3; - -import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE; -import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT; -import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT; -import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED; -import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; -import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; -import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists; -import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties; -import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; -import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME; -import static org.apache.asterix.external.util.aws.s3.S3Constants.CROSS_REGION_FIELD_NAME; -import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_EXPIRED_TOKEN; -import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR; -import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED; -import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN; -import static org.apache.asterix.external.util.aws.s3.S3Constants.EXTERNAL_ID_FIELD_NAME; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUMED_ROLE; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_ARN; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_EXTERNAL_ID; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INSTANCE_PROFILE; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_REGION; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SIMPLE; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMPORARY; -import static org.apache.asterix.external.util.aws.s3.S3Constants.INSTANCE_PROFILE_FIELD_NAME; -import static org.apache.asterix.external.util.aws.s3.S3Constants.PATH_STYLE_ADDRESSING_FIELD_NAME; -import static org.apache.asterix.external.util.aws.s3.S3Constants.REGION_FIELD_NAME; -import static org.apache.asterix.external.util.aws.s3.S3Constants.ROLE_ARN_FIELD_NAME; -import static org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME; -import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME; -import static org.apache.asterix.external.util.aws.s3.S3Constants.SESSION_TOKEN_FIELD_NAME; -import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; - -import org.apache.asterix.common.api.IApplicationContext; -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.external.IExternalCredentialsCache; -import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater; -import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.ExternalDataPrefix; -import org.apache.hadoop.fs.s3a.Constants; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.exceptions.IWarningCollector; -import org.apache.hyracks.api.exceptions.SourceLocation; -import org.apache.hyracks.api.exceptions.Warning; -import org.apache.hyracks.api.util.CleanupUtils; - -import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; -import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.exception.SdkException; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3ClientBuilder; -import software.amazon.awssdk.services.s3.model.ListObjectsResponse; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; -import software.amazon.awssdk.services.s3.model.S3Exception; -import software.amazon.awssdk.services.s3.model.S3Response; -import software.amazon.awssdk.services.sts.StsClient; -import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; -import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; -import software.amazon.awssdk.services.sts.model.Credentials; - -public class S3AuthUtils { - enum AuthenticationType { - ANONYMOUS, - ARN_ASSUME_ROLE, - INSTANCE_PROFILE, - ACCESS_KEYS, - BAD_AUTHENTICATION - } - - private S3AuthUtils() { - throw new AssertionError("do not instantiate"); - } - - public static boolean isRetryableError(String errorCode) { - return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN); - } - - public static boolean isArnAssumedRoleExpiredToken(Map<String, String> configuration, String errorCode) { - return ERROR_EXPIRED_TOKEN.equals(errorCode) - && getAuthenticationType(configuration) == AuthenticationType.ARN_ASSUME_ROLE; - } - - /** - * Builds the S3 client using the provided configuration - * - * @param configuration properties - * @return S3 client - * @throws CompilationException CompilationException - */ - public static S3Client buildAwsS3Client(IApplicationContext appCtx, Map<String, String> configuration) - throws CompilationException { - String regionId = configuration.get(REGION_FIELD_NAME); - String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); - - Region region = validateAndGetRegion(regionId); - boolean crossRegion = validateAndGetCrossRegion(configuration.get(CROSS_REGION_FIELD_NAME)); - AwsCredentialsProvider credentialsProvider = buildCredentialsProvider(appCtx, configuration); - - S3ClientBuilder builder = S3Client.builder(); - builder.region(region); - builder.crossRegionAccessEnabled(crossRegion); - builder.credentialsProvider(credentialsProvider); - - // Validate the service endpoint if present - if (serviceEndpoint != null) { - try { - URI uri = new URI(serviceEndpoint); - try { - builder.endpointOverride(uri); - } catch (NullPointerException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); - } - } catch (URISyntaxException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, - String.format("Invalid service endpoint %s", serviceEndpoint)); - } - } - - boolean pathStyleAddressing = - validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME), serviceEndpoint); - builder.forcePathStyle(pathStyleAddressing); - return builder.build(); - } - - public static AwsCredentialsProvider buildCredentialsProvider(IApplicationContext appCtx, - Map<String, String> configuration) throws CompilationException { - AuthenticationType authenticationType = getAuthenticationType(configuration); - switch (authenticationType) { - case ANONYMOUS: - return AnonymousCredentialsProvider.create(); - case ARN_ASSUME_ROLE: - return getTrustAccountCredentials(appCtx, configuration); - case INSTANCE_PROFILE: - return getInstanceProfileCredentials(configuration); - case ACCESS_KEYS: - return getAccessKeyCredentials(configuration); - default: - // missing required creds, report correct error message - String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME); - if (externalId != null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME, - EXTERNAL_ID_FIELD_NAME); - } else { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME, - SESSION_TOKEN_FIELD_NAME); - } - } - } - - public static Region validateAndGetRegion(String regionId) throws CompilationException { - List<Region> regions = S3Client.serviceMetadata().regions(); - Optional<Region> selectedRegion = regions.stream().filter(region -> region.id().equals(regionId)).findFirst(); - - if (selectedRegion.isEmpty()) { - throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId); - } - return selectedRegion.get(); - } - - private static AuthenticationType getAuthenticationType(Map<String, String> configuration) { - String roleArn = configuration.get(ROLE_ARN_FIELD_NAME); - String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME); - String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); - String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); - - if (noAuth(configuration)) { - return AuthenticationType.ANONYMOUS; - } else if (roleArn != null) { - return AuthenticationType.ARN_ASSUME_ROLE; - } else if (instanceProfile != null) { - return AuthenticationType.INSTANCE_PROFILE; - } else if (accessKeyId != null || secretAccessKey != null) { - return AuthenticationType.ACCESS_KEYS; - } else { - return AuthenticationType.BAD_AUTHENTICATION; - } - } - - public static boolean validateAndGetPathStyleAddressing(String pathStyleAddressing, String endpoint) - throws CompilationException { - if (pathStyleAddressing == null) { - return endpoint != null && !endpoint.isEmpty(); - } - validatePathStyleAddressing(pathStyleAddressing); - return Boolean.parseBoolean(pathStyleAddressing); - } - - public static void validatePathStyleAddressing(String pathStyleAddressing) throws CompilationException { - if (pathStyleAddressing == null) { - return; - } - if (!"true".equalsIgnoreCase(pathStyleAddressing) && !"false".equalsIgnoreCase(pathStyleAddressing)) { - throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, PATH_STYLE_ADDRESSING_FIELD_NAME, - "true, false"); - } - } - - public static boolean validateAndGetCrossRegion(String crossRegion) throws CompilationException { - if (crossRegion == null) { - return false; - } - if (!"true".equalsIgnoreCase(crossRegion) && !"false".equalsIgnoreCase(crossRegion)) { - throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, CROSS_REGION_FIELD_NAME, "true, false"); - } - return Boolean.parseBoolean(crossRegion); - } - - private static boolean noAuth(Map<String, String> configuration) { - return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME, - ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, SESSION_TOKEN_FIELD_NAME) == null; - } - - /** - * Returns the cached credentials if valid, otherwise, generates new credentials - * - * @param appCtx application context - * @param configuration configuration - * @return returns the cached credentials if valid, otherwise, generates new credentials - * @throws CompilationException CompilationException - */ - public static AwsCredentialsProvider getTrustAccountCredentials(IApplicationContext appCtx, - Map<String, String> configuration) throws CompilationException { - IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache(); - Object credentialsObject = cache.get(configuration.get(ExternalDataConstants.KEY_ENTITY_ID)); - if (credentialsObject != null) { - return () -> (AwsSessionCredentials) credentialsObject; - } - IExternalCredentialsCacheUpdater cacheUpdater = appCtx.getExternalCredentialsCacheUpdater(); - AwsSessionCredentials credentials; - try { - credentials = (AwsSessionCredentials) cacheUpdater.generateAndCacheCredentials(configuration); - } catch (HyracksDataException ex) { - throw new CompilationException(ErrorCode.FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION, ex, ex.getMessage()); - } - - return () -> credentials; - } - - /** - * Assume role using provided credentials and return the new credentials - * - * @param configuration configuration - * @return return credentials from the assume role - * @throws CompilationException CompilationException - */ - public static AwsCredentialsProvider assumeRoleAndGetCredentials(Map<String, String> configuration) - throws CompilationException { - String regionId = configuration.get(REGION_FIELD_NAME); - String arnRole = configuration.get(ROLE_ARN_FIELD_NAME); - String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME); - Region region = validateAndGetRegion(regionId); - - AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder(); - builder.roleArn(arnRole); - builder.roleSessionName(UUID.randomUUID().toString()); - builder.durationSeconds(900); // TODO(htowaileb): configurable? Can be 900 to 43200 (15 mins to 12 hours) - if (externalId != null) { - builder.externalId(externalId); - } - AssumeRoleRequest request = builder.build(); - AwsCredentialsProvider credentialsProvider = getCredentialsToAssumeRole(configuration); - - // assume the role from the provided arn - try (StsClient stsClient = - StsClient.builder().region(region).credentialsProvider(credentialsProvider).build()) { - AssumeRoleResponse response = stsClient.assumeRole(request); - Credentials credentials = response.credentials(); - return StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(), - credentials.secretAccessKey(), credentials.sessionToken())); - } catch (SdkException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); - } - } - - private static AwsCredentialsProvider getCredentialsToAssumeRole(Map<String, String> configuration) - throws CompilationException { - String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME); - String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); - String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); - if (instanceProfile != null) { - return getInstanceProfileCredentials(configuration); - } else if (accessKeyId != null || secretAccessKey != null) { - return getAccessKeyCredentials(configuration); - } else { - throw new CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION); - } - } - - private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration) - throws CompilationException { - String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME); - - // only "true" value is allowed - if (!"true".equalsIgnoreCase(instanceProfile)) { - throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INSTANCE_PROFILE_FIELD_NAME, "true"); - } - - String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, - SESSION_TOKEN_FIELD_NAME); - if (notAllowed != null) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, - INSTANCE_PROFILE_FIELD_NAME); - } - return InstanceProfileCredentialsProvider.create(); - } - - private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration) - throws CompilationException { - String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); - String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); - String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME); - - if (accessKeyId == null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME, - SECRET_ACCESS_KEY_FIELD_NAME); - } - if (secretAccessKey == null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME, - ACCESS_KEY_ID_FIELD_NAME); - } - - // use session token if provided - if (sessionToken != null) { - return StaticCredentialsProvider - .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken)); - } else { - return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey)); - } - } - - private static String getNonNull(Map<String, String> configuration, String... fieldNames) { - for (String fieldName : fieldNames) { - if (configuration.get(fieldName) != null) { - return fieldName; - } - } - return null; - } - - public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, JobConf conf, - Map<String, String> configuration) throws CompilationException { - configureAwsS3HdfsJobConf(appCtx, conf, configuration, 0); - } - - /** - * Builds the S3 client using the provided configuration - * - * @param appCtx application context - * @param configuration properties - * @param numberOfPartitions number of partitions in the cluster - */ - public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, JobConf jobConf, - Map<String, String> configuration, int numberOfPartitions) throws CompilationException { - setHadoopCredentials(appCtx, jobConf, configuration); - String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); - Region region = validateAndGetRegion(configuration.get(REGION_FIELD_NAME)); - jobConf.set(HADOOP_REGION, region.toString()); - if (serviceEndpoint != null) { - // Validation of the URL should be done at hadoop-aws level - jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint); - } else { - //Region is ignored and buckets could be found by the central endpoint - jobConf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT); - } - - boolean pathStyleAddressing = - validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME), serviceEndpoint); - if (pathStyleAddressing) { - jobConf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE); - } else { - jobConf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.FALSE); - } - - /* - * Set the size of S3 connection pool to be the number of partitions - */ - if (numberOfPartitions != 0) { - jobConf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions)); - } - } - - /** - * Sets the credentials provider type and the credentials to hadoop based on the provided configuration - * - * @param jobConf hadoop job config - * @param configuration external details configuration - */ - private static void setHadoopCredentials(IApplicationContext appCtx, JobConf jobConf, - Map<String, String> configuration) throws CompilationException { - AuthenticationType authenticationType = getAuthenticationType(configuration); - switch (authenticationType) { - case ANONYMOUS: - jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS); - break; - case ARN_ASSUME_ROLE: - jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ASSUMED_ROLE); - jobConf.set(HADOOP_ASSUME_ROLE_ARN, configuration.get(ROLE_ARN_FIELD_NAME)); - jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, configuration.get(EXTERNAL_ID_FIELD_NAME)); - jobConf.set(HADOOP_ASSUME_ROLE_SESSION_NAME, "parquet-" + UUID.randomUUID()); - - // hadoop accepts time 15m to 1h, we will base it on the provided configuration - int durationInSeconds = appCtx.getExternalProperties().getAwsAssumeRoleDuration(); - String hadoopDuration = getHadoopDuration(durationInSeconds); - jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, hadoopDuration); - - // TODO: this assumes basic keys always, also support if we use InstanceProfile to assume a role - jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY, HADOOP_SIMPLE); - jobConf.set(HADOOP_ACCESS_KEY_ID, configuration.get(ACCESS_KEY_ID_FIELD_NAME)); - jobConf.set(HADOOP_SECRET_ACCESS_KEY, configuration.get(SECRET_ACCESS_KEY_FIELD_NAME)); - break; - case INSTANCE_PROFILE: - jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_INSTANCE_PROFILE); - break; - case ACCESS_KEYS: - jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_SIMPLE); - jobConf.set(HADOOP_ACCESS_KEY_ID, configuration.get(ACCESS_KEY_ID_FIELD_NAME)); - jobConf.set(HADOOP_SECRET_ACCESS_KEY, configuration.get(SECRET_ACCESS_KEY_FIELD_NAME)); - if (configuration.get(SESSION_TOKEN_FIELD_NAME) != null) { - jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMPORARY); - jobConf.set(HADOOP_SESSION_TOKEN, configuration.get(SESSION_TOKEN_FIELD_NAME)); - } - break; - case BAD_AUTHENTICATION: - throw new CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED); - } - } - - /** - * Hadoop accepts duration values from 15m to 1h (in this format). We will base this on the configured - * duration in seconds. If the time exceeds 1 hour, we will return 1h - * - * @param seconds configured duration in seconds - * @return hadoop updated duration - */ - private static String getHadoopDuration(int seconds) { - // constants for time thresholds - final int FIFTEEN_MINUTES_IN_SECONDS = 15 * 60; - final int ONE_HOUR_IN_SECONDS = 60 * 60; - - // Adjust seconds to fit within bounds - if (seconds < FIFTEEN_MINUTES_IN_SECONDS) { - seconds = FIFTEEN_MINUTES_IN_SECONDS; - } else if (seconds > ONE_HOUR_IN_SECONDS) { - seconds = ONE_HOUR_IN_SECONDS; - } - - // Convert seconds to minutes - int minutes = seconds / 60; - - // Format the result - if (minutes == 60) { - return "1h"; - } else { - return minutes + "m"; - } - } - - /** - * Validate external dataset properties - * - * @param configuration properties - * @throws CompilationException Compilation exception - */ - public static void validateProperties(IApplicationContext appCtx, Map<String, String> configuration, - SourceLocation srcLoc, IWarningCollector collector) throws CompilationException { - if (isDeltaTable(configuration)) { - validateDeltaTableProperties(configuration); - } - // check if the format property is present - else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { - throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); - } - - validateIncludeExclude(configuration); - try { - // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation - new ExternalDataPrefix(configuration); - } catch (AlgebricksException ex) { - throw new CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex); - } - - // Check if the bucket is present - S3Client s3Client = buildAwsS3Client(appCtx, configuration); - S3Response response; - boolean useOldApi = false; - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - String prefix = getPrefix(configuration); - - try { - response = S3Utils.isBucketEmpty(s3Client, container, prefix, false); - } catch (S3Exception ex) { - // Method not implemented, try falling back to old API - try { - // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html - if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) { - useOldApi = true; - response = S3Utils.isBucketEmpty(s3Client, container, prefix, true); - } else { - throw ex; - } - } catch (SdkException ex2) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, getMessageOrToString(ex)); - } - } catch (SdkException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); - } finally { - if (s3Client != null) { - CleanupUtils.close(s3Client, null); - } - } - - boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty() - : ((ListObjectsV2Response) response).contents().isEmpty(); - if (isEmpty && collector.shouldWarn()) { - Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); - collector.warn(warning); - } - - // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to - // ensure coverage, check if the result is successful as well and not only catch exceptions - if (!response.sdkHttpResponse().isSuccessful()) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); - } - if (isDeltaTable(configuration)) { - try { - validateDeltaTableExists(appCtx, configuration); - } catch (AlgebricksException e) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e); - } - } - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java index 102c378cca..3cfef1e996 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java @@ -26,23 +26,7 @@ public class S3Constants { // Key max length public static final int MAX_KEY_LENGTH_IN_BYTES = 1024; - // Authentication specific parameters - public static final String REGION_FIELD_NAME = "region"; public static final String PATH_STYLE_ADDRESSING_FIELD_NAME = "pathStyleAddressing"; - public static final String CROSS_REGION_FIELD_NAME = "crossRegion"; - public static final String INSTANCE_PROFILE_FIELD_NAME = "instanceProfile"; - public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId"; - public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey"; - public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken"; - public static final String ROLE_ARN_FIELD_NAME = "roleArn"; - public static final String EXTERNAL_ID_FIELD_NAME = "externalId"; - public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint"; - - // AWS S3 specific error codes - public static final String ERROR_INTERNAL_ERROR = "InternalError"; - public static final String ERROR_SLOW_DOWN = "SlowDown"; - public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented"; - public static final String ERROR_EXPIRED_TOKEN = "ExpiredToken"; /* * Hadoop-AWS diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java index 7783456871..e20e39f2bf 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java @@ -18,10 +18,50 @@ */ package org.apache.asterix.external.util.aws.s3; +import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE; import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; -import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED; +import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; +import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists; +import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties; +import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; +import static org.apache.asterix.external.util.aws.AwsConstants.ACCESS_KEY_ID_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.CROSS_REGION_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.ERROR_INTERNAL_ERROR; +import static org.apache.asterix.external.util.aws.AwsConstants.ERROR_METHOD_NOT_IMPLEMENTED; +import static org.apache.asterix.external.util.aws.AwsConstants.ERROR_SLOW_DOWN; +import static org.apache.asterix.external.util.aws.AwsConstants.EXTERNAL_ID_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.ROLE_ARN_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.SECRET_ACCESS_KEY_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.SERVICE_END_POINT_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.SESSION_TOKEN_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsUtils.buildCredentialsProvider; +import static org.apache.asterix.external.util.aws.AwsUtils.getAuthenticationType; +import static org.apache.asterix.external.util.aws.AwsUtils.validateAndGetCrossRegion; +import static org.apache.asterix.external.util.aws.AwsUtils.validateAndGetRegion; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUMED_ROLE; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_ARN; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_EXTERNAL_ID; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INSTANCE_PROFILE; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_REGION; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SIMPLE; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMPORARY; +import static org.apache.asterix.external.util.aws.s3.S3Constants.PATH_STYLE_ADDRESSING_FIELD_NAME; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -38,13 +78,21 @@ import org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataPrefix; import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.aws.AwsUtils; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.exceptions.Warning; import org.apache.hyracks.api.util.CleanupUtils; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.model.CommonPrefix; import software.amazon.awssdk.services.s3.model.ListObjectsRequest; import software.amazon.awssdk.services.s3.model.ListObjectsResponse; @@ -60,6 +108,241 @@ public class S3Utils { throw new AssertionError("do not instantiate"); } + /** + * Builds the S3 client using the provided configuration + * + * @param configuration properties + * @return S3 client + * @throws CompilationException CompilationException + */ + public static S3Client buildClient(IApplicationContext appCtx, Map<String, String> configuration) + throws CompilationException { + String regionId = configuration.get(REGION_FIELD_NAME); + String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); + + Region region = validateAndGetRegion(regionId); + boolean crossRegion = validateAndGetCrossRegion(configuration.get(CROSS_REGION_FIELD_NAME)); + AwsCredentialsProvider credentialsProvider = buildCredentialsProvider(appCtx, configuration); + + S3ClientBuilder builder = S3Client.builder(); + builder.region(region); + builder.crossRegionAccessEnabled(crossRegion); + builder.credentialsProvider(credentialsProvider); + + // Validate the service endpoint if present + if (serviceEndpoint != null) { + try { + URI uri = new URI(serviceEndpoint); + try { + builder.endpointOverride(uri); + } catch (NullPointerException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); + } + } catch (URISyntaxException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, + String.format("Invalid service endpoint %s", serviceEndpoint)); + } + } + + boolean pathStyleAddressing = + validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME), serviceEndpoint); + builder.forcePathStyle(pathStyleAddressing); + return builder.build(); + } + + public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, JobConf conf, + Map<String, String> configuration) throws CompilationException { + configureAwsS3HdfsJobConf(appCtx, conf, configuration, 0); + } + + /** + * Builds the S3 client using the provided configuration + * + * @param appCtx application context + * @param configuration properties + * @param numberOfPartitions number of partitions in the cluster + */ + public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, JobConf jobConf, + Map<String, String> configuration, int numberOfPartitions) throws CompilationException { + setHadoopCredentials(appCtx, jobConf, configuration); + String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); + Region region = validateAndGetRegion(configuration.get(REGION_FIELD_NAME)); + jobConf.set(HADOOP_REGION, region.toString()); + if (serviceEndpoint != null) { + // Validation of the URL should be done at hadoop-aws level + jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint); + } else { + //Region is ignored and buckets could be found by the central endpoint + jobConf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT); + } + + boolean pathStyleAddressing = + validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME), serviceEndpoint); + if (pathStyleAddressing) { + jobConf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE); + } else { + jobConf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.FALSE); + } + + /* + * Set the size of S3 connection pool to be the number of partitions + */ + if (numberOfPartitions != 0) { + jobConf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions)); + } + } + + /** + * Sets the credentials provider type and the credentials to hadoop based on the provided configuration + * + * @param jobConf hadoop job config + * @param configuration external details configuration + */ + private static void setHadoopCredentials(IApplicationContext appCtx, JobConf jobConf, + Map<String, String> configuration) throws CompilationException { + AwsUtils.AuthenticationType authenticationType = getAuthenticationType(configuration); + switch (authenticationType) { + case ANONYMOUS: + jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS); + break; + case ARN_ASSUME_ROLE: + jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ASSUMED_ROLE); + jobConf.set(HADOOP_ASSUME_ROLE_ARN, configuration.get(ROLE_ARN_FIELD_NAME)); + jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, configuration.get(EXTERNAL_ID_FIELD_NAME)); + jobConf.set(HADOOP_ASSUME_ROLE_SESSION_NAME, "parquet-" + UUID.randomUUID()); + + // hadoop accepts time 15m to 1h, we will base it on the provided configuration + int durationInSeconds = appCtx.getExternalProperties().getAwsAssumeRoleDuration(); + String hadoopDuration = getHadoopDuration(durationInSeconds); + jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, hadoopDuration); + + // TODO: this assumes basic keys always, also support if we use InstanceProfile to assume a role + jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY, HADOOP_SIMPLE); + jobConf.set(HADOOP_ACCESS_KEY_ID, configuration.get(ACCESS_KEY_ID_FIELD_NAME)); + jobConf.set(HADOOP_SECRET_ACCESS_KEY, configuration.get(SECRET_ACCESS_KEY_FIELD_NAME)); + break; + case INSTANCE_PROFILE: + jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_INSTANCE_PROFILE); + break; + case ACCESS_KEYS: + jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_SIMPLE); + jobConf.set(HADOOP_ACCESS_KEY_ID, configuration.get(ACCESS_KEY_ID_FIELD_NAME)); + jobConf.set(HADOOP_SECRET_ACCESS_KEY, configuration.get(SECRET_ACCESS_KEY_FIELD_NAME)); + if (configuration.get(SESSION_TOKEN_FIELD_NAME) != null) { + jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMPORARY); + jobConf.set(HADOOP_SESSION_TOKEN, configuration.get(SESSION_TOKEN_FIELD_NAME)); + } + break; + case BAD_AUTHENTICATION: + throw new CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED); + } + } + + /** + * Hadoop accepts duration values from 15m to 1h (in this format). We will base this on the configured + * duration in seconds. If the time exceeds 1 hour, we will return 1h + * + * @param seconds configured duration in seconds + * @return hadoop updated duration + */ + private static String getHadoopDuration(int seconds) { + // constants for time thresholds + final int FIFTEEN_MINUTES_IN_SECONDS = 15 * 60; + final int ONE_HOUR_IN_SECONDS = 60 * 60; + + // Adjust seconds to fit within bounds + if (seconds < FIFTEEN_MINUTES_IN_SECONDS) { + seconds = FIFTEEN_MINUTES_IN_SECONDS; + } else if (seconds > ONE_HOUR_IN_SECONDS) { + seconds = ONE_HOUR_IN_SECONDS; + } + + // Convert seconds to minutes + int minutes = seconds / 60; + + // Format the result + if (minutes == 60) { + return "1h"; + } else { + return minutes + "m"; + } + } + + /** + * Validate external dataset properties + * + * @param configuration properties + * @throws CompilationException Compilation exception + */ + public static void validateProperties(IApplicationContext appCtx, Map<String, String> configuration, + SourceLocation srcLoc, IWarningCollector collector) throws CompilationException { + if (isDeltaTable(configuration)) { + validateDeltaTableProperties(configuration); + } + // check if the format property is present + else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); + } + + validateIncludeExclude(configuration); + try { + // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation + new ExternalDataPrefix(configuration); + } catch (AlgebricksException ex) { + throw new CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex); + } + + // Check if the bucket is present + S3Client s3Client = buildClient(appCtx, configuration); + S3Response response; + boolean useOldApi = false; + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + String prefix = getPrefix(configuration); + + try { + response = S3Utils.isBucketEmpty(s3Client, container, prefix, false); + } catch (S3Exception ex) { + // Method not implemented, try falling back to old API + try { + // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html + if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) { + useOldApi = true; + response = S3Utils.isBucketEmpty(s3Client, container, prefix, true); + } else { + throw ex; + } + } catch (SdkException ex2) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, getMessageOrToString(ex)); + } + } catch (SdkException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); + } finally { + if (s3Client != null) { + CleanupUtils.close(s3Client, null); + } + } + + boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty() + : ((ListObjectsV2Response) response).contents().isEmpty(); + if (isEmpty && collector.shouldWarn()) { + Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); + collector.warn(warning); + } + + // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to + // ensure coverage, check if the result is successful as well and not only catch exceptions + if (!response.sdkHttpResponse().isSuccessful()) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); + } + if (isDeltaTable(configuration)) { + try { + validateDeltaTableExists(appCtx, configuration); + } catch (AlgebricksException e) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e); + } + } + } + /** * Checks for a single object in the specified bucket to determine if the bucket is empty or not. * @@ -96,7 +379,7 @@ public class S3Utils { // Prepare to retrieve the objects List<S3Object> filesOnly; String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx, configuration); + S3Client s3Client = buildClient(appCtx, configuration); String prefix = getPrefix(configuration); try { @@ -243,7 +526,7 @@ public class S3Utils { Map<String, String> configuration, String container, String prefix) throws CompilationException, HyracksDataException { // create s3 client - S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx, configuration); + S3Client s3Client = buildClient(appCtx, configuration); // fetch all the s3 objects return listS3ObjectsOfSingleDepth(s3Client, container, prefix); } @@ -300,12 +583,26 @@ public class S3Utils { + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); } - /** - * Generates a random external ID to be used in cross-account role assumption. - * - * @return external id - */ - public static String generateExternalId() { - return UUID.randomUUID().toString(); + public static boolean isRetryableError(String errorCode) { + return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN); + } + + public static boolean validateAndGetPathStyleAddressing(String pathStyleAddressing, String endpoint) + throws CompilationException { + if (pathStyleAddressing == null) { + return endpoint != null && !endpoint.isEmpty(); + } + validatePathStyleAddressing(pathStyleAddressing); + return "true".equalsIgnoreCase(pathStyleAddressing); + } + + public static void validatePathStyleAddressing(String pathStyleAddressing) throws CompilationException { + if (pathStyleAddressing == null) { + return; + } + if (!"true".equalsIgnoreCase(pathStyleAddressing) && !"false".equalsIgnoreCase(pathStyleAddressing)) { + throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, PATH_STYLE_ADDRESSING_FIELD_NAME, + "true, false"); + } } } diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java index 91afbd8279..2f8791ed41 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.external.input.record.reader.awss3; -import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR; -import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN; +import static org.apache.asterix.external.util.aws.AwsConstants.ERROR_INTERNAL_ERROR; +import static org.apache.asterix.external.util.aws.AwsConstants.ERROR_SLOW_DOWN; import java.lang.reflect.Field; import java.lang.reflect.Method;
