This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit ba961ddf07b7ffef21f1890cb5374235199cf2d2 Author: Hussain Towaileb <[email protected]> AuthorDate: Tue Feb 4 16:33:11 2025 +0300 [ASTERIXDB-3565][EXT]: Add impersonate service account auth for GCS - user model changes: no - storage format changes: no - interface changes: no Details: Add support to impersonating a service account as an authentication method for GCS links. This works by providing a target service account to impersonate and provide source credentials (ours) to use to impersonate the account. There is no need to store any temporary credentials/tokens as the SDK automatically picks up the token generated, and if expired, automatically refresh it for subsequent requests. Ext-ref: MB-65121 Change-Id: Ie1d69faa45a03550c8e0fe66eb18c2ae53a8454a Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19444 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> --- .../external/ExternalCredentialsCacheUpdater.java | 9 +- .../api/cluster_state_1/cluster_state_1.1.regexadm | 1 + .../cluster_state_1_full.1.regexadm | 1 + .../cluster_state_1_less.1.regexadm | 1 + .../cloud/writer/GCSExternalFileWriterFactory.java | 4 +- .../asterix/common/config/ExternalProperties.java | 12 +- .../asterix/common/exceptions/ErrorCode.java | 2 + .../src/main/resources/asx_errormsg/en.properties | 2 + .../input/record/reader/gcs/GCSInputStream.java | 9 +- .../record/reader/gcs/GCSInputStreamFactory.java | 8 +- .../reader/gcs/delta/GCSDeltaReaderFactory.java | 4 +- .../gcs/parquet/GCSParquetReaderFactory.java | 7 +- .../asterix/external/util/ExternalDataUtils.java | 4 +- .../asterix/external/util/aws/s3/S3AuthUtils.java | 74 ++---- .../external/util/google/gcs/GCSAuthUtils.java | 294 +++++++++++++++++++++ .../external/util/google/gcs/GCSConstants.java | 13 +- .../asterix/external/util/google/gcs/GCSUtils.java | 165 +----------- 17 files changed, 390 insertions(+), 220 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java index 4c382d0d03..10f536bb71 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java @@ -78,14 +78,15 @@ public class ExternalCredentialsCacheUpdater implements IExternalCredentialsCach String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE); if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(type)) { - credentials = generateAwsCredentials(configuration); + return generateAwsCredentials(configuration); + } else { + // this should never happen + throw new IllegalArgumentException("Unsupported external source type: " + type); } - - return credentials; } // TODO: this can probably be refactored out into something that is AWS-specific - private Object generateAwsCredentials(Map<String, String> configuration) + private AwsSessionCredentials generateAwsCredentials(Map<String, String> configuration) throws HyracksDataException, CompilationException { String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID); AwsSessionCredentials credentials; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index 3ffb080f4a..c6d7ba9de5 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -61,6 +61,7 @@ "compiler\.textsearchmemory" : 163840, "compiler\.windowmemory" : 196608, "default\.dir" : "target/io/dir/asterixdb", + "gcp.impersonate.service.account.duration" : 900, "library\.deploy\.timeout" : 1800, "log\.dir" : "logs/", "log\.level" : "DEBUG", diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index aeb3cea25b..562e195afc 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -61,6 +61,7 @@ "compiler\.textsearchmemory" : 163840, "compiler\.windowmemory" : 196608, "default\.dir" : "target/io/dir/asterixdb", + "gcp.impersonate.service.account.duration" : 900, "library\.deploy\.timeout" : 1800, "log\.dir" : "logs/", "log\.level" : "WARN", diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index b475612291..132fa5b281 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -61,6 +61,7 @@ "compiler\.textsearchmemory" : 163840, "compiler\.windowmemory" : 196608, "default\.dir" : "target/io/dir/asterixdb", + "gcp.impersonate.service.account.duration" : 900, "library\.deploy\.timeout" : 1800, "log\.dir" : "logs/", "log\.level" : "WARN", diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java index 3a34365fac..02e28810c8 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java @@ -27,7 +27,7 @@ import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.google.gcs.GCSUtils; +import org.apache.asterix.external.util.google.gcs.GCSAuthUtils; import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration; import org.apache.asterix.runtime.writer.IExternalFileWriter; import org.apache.asterix.runtime.writer.IExternalFileWriterFactory; @@ -66,7 +66,7 @@ public final class GCSExternalFileWriterFactory extends AbstractCloudExternalFil @Override ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException { GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize); - return new GCSCloudClient(config, GCSUtils.buildClient(configuration), + return new GCSCloudClient(config, GCSAuthUtils.buildClient(appCtx, configuration), ICloudGuardian.NoOpCloudGuardian.INSTANCE); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java index 62437b20bb..d233606fa6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java @@ -64,7 +64,12 @@ public class ExternalProperties extends AbstractProperties { 75, "Percentage of duration passed before assume role credentials need to be refreshed, the value ranges " + "from 25 to 90, default is 75. For example, if the value is set to 65, this means the " - + "credentials need to be refreshed if 65% of the total expiration duration is already passed"); + + "credentials need to be refreshed if 65% of the total expiration duration is already passed"), + GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION( + getRangedIntegerType(60, 3600), + 900, + "GCS impersonating service account duration in seconds. " + + "Range from 60 seconds (1 min) to 3600 seconds (1 hour)"); private final IOptionType type; private final Object defaultValue; @@ -94,6 +99,7 @@ public class ExternalProperties extends AbstractProperties { case AZURE_REQUEST_TIMEOUT: case AWS_ASSUME_ROLE_DURATION: case AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE: + case GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION: return Section.COMMON; case CC_JAVA_OPTS: case NC_JAVA_OPTS: @@ -182,4 +188,8 @@ public class ExternalProperties extends AbstractProperties { public int getAwsRefreshAssumeRoleThresholdPercentage() { return accessor.getInt(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE); } + + public int getGcpImpersonateServiceAccountDuration() { + return accessor.getInt(Option.GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 46d63bb072..463695e040 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -326,6 +326,8 @@ public enum ErrorCode implements IError { CSV_INVALID_FORCE_QUOTE(1218), CSV_INVALID_ESCAPE(1219), CANNOT_TRUNCATE_DATASET_TYPE(1220), + NO_VALID_AUTHENTICATION_PARAMS_PROVIDED(1221), + NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT(1222), // Feed errors DATAFLOW_ILLEGAL_STATE(3001), diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index d6a171ff78..df08016e31 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -328,6 +328,8 @@ 1218 = '%1$s' is not a valid force-quote input. The length of a force-quote input should be 1 character 1219 = '%1$s' is not a valid escape. The length of a escape should be 1 1220 = Cannot truncate %1$s '%2$s' +1221 = No valid authentication parameters were provided +1222 = No valid authentication parameters were provided to impersonate service account # Feed Errors 3001 = Illegal state. diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java index 89da06512f..1ef3fcde9c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java @@ -27,13 +27,14 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.google.gcs.GCSUtils; +import org.apache.asterix.external.util.google.gcs.GCSAuthUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.CleanupUtils; @@ -46,13 +47,15 @@ import com.google.cloud.storage.Storage; public class GCSInputStream extends AbstractExternalInputStream { + private final IApplicationContext ncAppCtx; private final Storage client; private final String container; private static final int MAX_ATTEMPTS = 5; // We try a total of 5 times in case of retryable errors - public GCSInputStream(Map<String, String> configuration, List<String> filePaths, + public GCSInputStream(IApplicationContext ncAppCtx, Map<String, String> configuration, List<String> filePaths, IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException { super(configuration, filePaths, valueEmbedder); + this.ncAppCtx = ncAppCtx; this.client = buildClient(configuration); this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); } @@ -136,7 +139,7 @@ public class GCSInputStream extends AbstractExternalInputStream { private Storage buildClient(Map<String, String> configuration) throws HyracksDataException { try { - return GCSUtils.buildClient(configuration); + return GCSAuthUtils.buildClient(ncAppCtx, configuration); } catch (CompilationException ex) { throw HyracksDataException.create(ex); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java index b6ad3cdea1..2de55a0857 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.external.IExternalFilterEvaluator; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.external.api.AsterixInputStream; @@ -47,7 +48,9 @@ public class GCSInputStreamFactory extends AbstractExternalInputStreamFactory { public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException { IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder(); int partition = context.getPartition(); - return new GCSInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(), + IApplicationContext ncAppCtx = (IApplicationContext) context.getTaskContext().getJobletContext() + .getServiceContext().getApplicationContext(); + return new GCSInputStream(ncAppCtx, configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(), valueEmbedder); } @@ -65,7 +68,8 @@ public class GCSInputStreamFactory extends AbstractExternalInputStreamFactory { configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot()); // get the items - List<Blob> filesOnly = GCSUtils.listItems(configuration, includeExcludeMatcher, warningCollector, + IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext(); + List<Blob> filesOnly = GCSUtils.listItems(appCtx, configuration, includeExcludeMatcher, warningCollector, externalDataPrefix, evaluator); // Distribute work load amongst the partitions diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java index db7673caad..301bfc4fa8 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.google.gcs.GCSAuthUtils; import org.apache.asterix.external.util.google.gcs.GCSUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -37,7 +38,8 @@ public class GCSDeltaReaderFactory extends DeltaReaderFactory { @Override protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration) throws AlgebricksException { - GCSUtils.configureHdfsJobConf(conf, configuration); + int numberOfPartitions = getPartitionConstraint().getLocations().length; + GCSAuthUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java index 17cad3e2a0..874c3bd78f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.external.IExternalFilterEvaluator; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.external.input.HDFSDataSourceFactory; @@ -30,6 +31,7 @@ import org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataPrefix; import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.google.gcs.GCSAuthUtils; import org.apache.asterix.external.util.google.gcs.GCSConstants; import org.apache.asterix.external.util.google.gcs.GCSUtils; import org.apache.hadoop.mapred.JobConf; @@ -59,7 +61,8 @@ public class GCSParquetReaderFactory extends HDFSDataSourceFactory { configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot()); String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - List<Blob> filesOnly = GCSUtils.listItems(configuration, includeExcludeMatcher, warningCollector, + IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext(); + List<Blob> filesOnly = GCSUtils.listItems(appCtx, configuration, includeExcludeMatcher, warningCollector, externalDataPrefix, evaluator); // get path @@ -71,7 +74,7 @@ public class GCSParquetReaderFactory extends HDFSDataSourceFactory { // configure hadoop input splits JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory); int numberOfPartitions = getPartitionConstraint().getLocations().length; - GCSUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions); + GCSAuthUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions); configureHdfsConf(conf, configuration); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 0fed43f7e3..591829bf3d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -35,7 +35,7 @@ import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_ import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf; import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties; import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties; -import static org.apache.asterix.external.util.google.gcs.GCSUtils.configureHdfsJobConf; +import static org.apache.asterix.external.util.google.gcs.GCSAuthUtils.configureHdfsJobConf; import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE; @@ -727,7 +727,7 @@ public class ExternalDataUtils { validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx); break; case ExternalDataConstants.KEY_ADAPTER_NAME_GCS: - validateProperties(configuration, srcLoc, collector); + validateProperties(appCtx, configuration, srcLoc, collector); break; case ExternalDataConstants.KEY_ADAPTER_NAME_HDFS: HDFSUtils.validateProperties(configuration, srcLoc, collector); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java index f6ca480dcb..035415d9c9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java @@ -105,7 +105,7 @@ import software.amazon.awssdk.services.sts.model.Credentials; public class S3AuthUtils { enum AuthenticationType { ANONYMOUS, - ARN, + ARN_ASSUME_ROLE, INSTANCE_PROFILE, ACCESS_KEYS, BAD_AUTHENTICATION @@ -120,7 +120,8 @@ public class S3AuthUtils { } public static boolean isArnAssumedRoleExpiredToken(Map<String, String> configuration, String errorCode) { - return ERROR_EXPIRED_TOKEN.equals(errorCode) && getAuthenticationType(configuration) == AuthenticationType.ARN; + return ERROR_EXPIRED_TOKEN.equals(errorCode) + && getAuthenticationType(configuration) == AuthenticationType.ARN_ASSUME_ROLE; } /** @@ -168,7 +169,7 @@ public class S3AuthUtils { switch (authenticationType) { case ANONYMOUS: return AnonymousCredentialsProvider.create(); - case ARN: + case ARN_ASSUME_ROLE: return getTrustAccountCredentials(appCtx, configuration); case INSTANCE_PROFILE: return getInstanceProfileCredentials(configuration); @@ -206,7 +207,7 @@ public class S3AuthUtils { if (noAuth(configuration)) { return AuthenticationType.ANONYMOUS; } else if (roleArn != null) { - return AuthenticationType.ARN; + return AuthenticationType.ARN_ASSUME_ROLE; } else if (instanceProfile != null) { return AuthenticationType.INSTANCE_PROFILE; } else if (accessKeyId != null || secretAccessKey != null) { @@ -232,11 +233,11 @@ public class S3AuthUtils { } /** - * Returns the cached credentials if valid, otherwise, generates new credentials by assume a role + * Returns the cached credentials if valid, otherwise, generates new credentials * * @param appCtx application context * @param configuration configuration - * @return returns the cached credentials if valid, otherwise, generates new credentials by assume a role + * @return returns the cached credentials if valid, otherwise, generates new credentials * @throws CompilationException CompilationException */ public static AwsCredentialsProvider getTrustAccountCredentials(IApplicationContext appCtx, @@ -278,20 +279,8 @@ public class S3AuthUtils { if (externalId != null) { builder.externalId(externalId); } - - // credentials to be used to assume the role - AwsCredentialsProvider credentialsProvider; AssumeRoleRequest request = builder.build(); - String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME); - String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); - String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); - if ("true".equalsIgnoreCase(instanceProfile)) { - credentialsProvider = getInstanceProfileCredentials(configuration, true); - } else if (accessKeyId != null && secretAccessKey != null) { - credentialsProvider = getAccessKeyCredentials(configuration, true); - } else { - throw new CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION); - } + AwsCredentialsProvider credentialsProvider = getCredentialsToAssumeRole(configuration); // assume the role from the provided arn try (StsClient stsClient = @@ -305,13 +294,22 @@ public class S3AuthUtils { } } - private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration) + private static AwsCredentialsProvider getCredentialsToAssumeRole(Map<String, String> configuration) throws CompilationException { - return getInstanceProfileCredentials(configuration, false); + String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME); + String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); + String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); + if (instanceProfile != null) { + return getInstanceProfileCredentials(configuration); + } else if (accessKeyId != null || secretAccessKey != null) { + return getAccessKeyCredentials(configuration); + } else { + throw new CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION); + } } - private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration, - boolean assumeRoleAuthentication) throws CompilationException { + private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration) + throws CompilationException { String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME); // only "true" value is allowed @@ -319,24 +317,17 @@ public class S3AuthUtils { throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INSTANCE_PROFILE_FIELD_NAME, "true"); } - if (!assumeRoleAuthentication) { - String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, - SESSION_TOKEN_FIELD_NAME); - if (notAllowed != null) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, - INSTANCE_PROFILE_FIELD_NAME); - } + String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, + SESSION_TOKEN_FIELD_NAME); + if (notAllowed != null) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, + INSTANCE_PROFILE_FIELD_NAME); } return InstanceProfileCredentialsProvider.create(); } private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration) throws CompilationException { - return getAccessKeyCredentials(configuration, false); - } - - private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration, - boolean assumeRoleAuthentication) throws CompilationException { String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME); @@ -350,14 +341,6 @@ public class S3AuthUtils { ACCESS_KEY_ID_FIELD_NAME); } - if (!assumeRoleAuthentication) { - String notAllowed = getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, EXTERNAL_ID_FIELD_NAME); - if (notAllowed != null) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, - INSTANCE_PROFILE_FIELD_NAME); - } - } - // use session token if provided if (sessionToken != null) { return StaticCredentialsProvider @@ -423,13 +406,13 @@ public class S3AuthUtils { * @param configuration external details configuration */ private static void setHadoopCredentials(IApplicationContext appCtx, JobConf jobConf, - Map<String, String> configuration) { + Map<String, String> configuration) throws CompilationException { AuthenticationType authenticationType = getAuthenticationType(configuration); switch (authenticationType) { case ANONYMOUS: jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS); break; - case ARN: + case ARN_ASSUME_ROLE: jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ASSUMED_ROLE); jobConf.set(HADOOP_ASSUME_ROLE_ARN, configuration.get(ROLE_ARN_FIELD_NAME)); jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, configuration.get(EXTERNAL_ID_FIELD_NAME)); @@ -458,6 +441,7 @@ public class S3AuthUtils { } break; case BAD_AUTHENTICATION: + throw new CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java new file mode 100644 index 0000000000..4473ad759a --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util.google.gcs; + +import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; +import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_GCS_PROTOCOL; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.HadoopAuthServiceAccount.IMPERSONATE_SERVICE_ACCOUNT; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.HDFSUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.json.JsonReadFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.auth.Credentials; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ImpersonatedCredentials; +import com.google.cloud.NoCredentials; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; + +public class GCSAuthUtils { + enum AuthenticationType { + ANONYMOUS, + IMPERSONATE_SERVICE_ACCOUNT, + APPLICATION_DEFAULT_CREDENTIALS, + SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS, + BAD_AUTHENTICATION + } + + private static final ObjectMapper JSON_CREDENTIALS_OBJECT_MAPPER = new ObjectMapper(); + private static final List<String> READ_WRITE_SCOPE_PERMISSION = + Collections.singletonList("https://www.googleapis.com/auth/devstorage.read_write"); + static { + JSON_CREDENTIALS_OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true); + } + + private GCSAuthUtils() { + throw new AssertionError("do not instantiate"); + } + + /** + * Builds the client using the provided configuration + * + * @param appCtx application context + * @param configuration properties + * @return Storage client + * @throws CompilationException CompilationException + */ + public static Storage buildClient(IApplicationContext appCtx, Map<String, String> configuration) + throws CompilationException { + String endpoint = configuration.get(ENDPOINT_FIELD_NAME); + + Credentials credentials = buildCredentials(appCtx, configuration); + StorageOptions.Builder builder = StorageOptions.newBuilder(); + builder.setCredentials(credentials); + + if (endpoint != null) { + builder.setHost(endpoint); + } + + return builder.build().getService(); + } + + public static Credentials buildCredentials(IApplicationContext appCtx, Map<String, String> configuration) throws CompilationException { + AuthenticationType authenticationType = getAuthenticationType(configuration); + return switch (authenticationType) { + case ANONYMOUS -> NoCredentials.getInstance(); + case IMPERSONATE_SERVICE_ACCOUNT -> getImpersonatedServiceAccountCredentials(appCtx, configuration); + case APPLICATION_DEFAULT_CREDENTIALS -> getApplicationDefaultCredentials(configuration); + case SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS -> getServiceAccountKeyCredentials(configuration); + case BAD_AUTHENTICATION -> throw new CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED); + }; + } + + private static AuthenticationType getAuthenticationType(Map<String, String> configuration) { + String impersonateServiceAccount = configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME); + String applicationDefaultCredentials = configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME); + String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME); + + if (noAuth(configuration)) { + return AuthenticationType.ANONYMOUS; + } else if (impersonateServiceAccount != null) { + return AuthenticationType.IMPERSONATE_SERVICE_ACCOUNT; + } else if (applicationDefaultCredentials != null) { + return AuthenticationType.APPLICATION_DEFAULT_CREDENTIALS; + } else if (jsonCredentials != null) { + return AuthenticationType.SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS; + } else { + return AuthenticationType.BAD_AUTHENTICATION; + } + } + + private static boolean noAuth(Map<String, String> configuration) { + return getNonNull(configuration, APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME, JSON_CREDENTIALS_FIELD_NAME, + IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME) == null; + } + + /** + * Returns the cached credentials if valid, otherwise, generates new credentials + * + * @param appCtx application context + * @param configuration configuration + * @return returns the cached credentials if valid, otherwise, generates new credentials + * @throws CompilationException CompilationException + */ + public static GoogleCredentials getImpersonatedServiceAccountCredentials(IApplicationContext appCtx, + Map<String, String> configuration) throws CompilationException { + GoogleCredentials sourceCredentials = getCredentialsToImpersonateServiceAccount(configuration); + String impersonateServiceAccount = configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME); + int duration = appCtx.getExternalProperties().getGcpImpersonateServiceAccountDuration(); + + // Create impersonated credentials + return ImpersonatedCredentials.create(sourceCredentials, impersonateServiceAccount, null, + READ_WRITE_SCOPE_PERMISSION, duration); + } + + private static GoogleCredentials getCredentialsToImpersonateServiceAccount(Map<String, String> configuration) + throws CompilationException { + String applicationDefaultCredentials = configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME); + String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME); + + if (applicationDefaultCredentials != null) { + return getApplicationDefaultCredentials(configuration); + } else if (jsonCredentials != null) { + return getServiceAccountKeyCredentials(configuration); + } else { + throw new CompilationException( + ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT); + } + } + + private static GoogleCredentials getApplicationDefaultCredentials(Map<String, String> configuration) + throws CompilationException { + try { + String notAllowed = getNonNull(configuration, JSON_CREDENTIALS_FIELD_NAME); + if (notAllowed != null) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, + APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME); + } + return GoogleCredentials.getApplicationDefault(); + } catch (Exception ex) { + throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); + } + } + + private static GoogleCredentials getServiceAccountKeyCredentials(Map<String, String> configuration) + throws CompilationException { + String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME); + try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) { + return GoogleCredentials.fromStream(credentialsStream); + } catch (IOException ex) { + throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); + } catch (Exception ex) { + throw new CompilationException(EXTERNAL_SOURCE_ERROR, + "Encountered an issue while processing the JSON credentials. Please ensure the provided credentials are valid."); + } + } + + public static void configureHdfsJobConf(JobConf jobConf, Map<String, String> configuration) + throws AlgebricksException { + configureHdfsJobConf(jobConf, configuration, 0); + } + + /** + * Builds the client using the provided configuration + * + * @param configuration properties + * @param numberOfPartitions number of partitions + */ + public static void configureHdfsJobConf(JobConf jobConf, Map<String, String> configuration, int numberOfPartitions) + throws AlgebricksException { + setHadoopCredentials(jobConf, configuration); + + // set endpoint if provided, default is https://storage.googleapis.com/ + String endpoint = configuration.get(ENDPOINT_FIELD_NAME); + if (endpoint != null) { + jobConf.set(HADOOP_ENDPOINT, endpoint); + } + + // disable caching FileSystem + HDFSUtils.disableHadoopFileSystemCache(jobConf, HADOOP_GCS_PROTOCOL); + + // TODO(htowaileb): make configurable, in case we hit rate limits then we can reduce it, default is 15 + if (numberOfPartitions != 0) { + jobConf.set(GCSConstants.MAX_BATCH_THREADS, String.valueOf(numberOfPartitions)); + } + + // recommended to be disabled by GCP hadoop team + jobConf.set(GCSConstants.HADOOP_SUPPORT_COMPRESSED, ExternalDataConstants.FALSE); + } + + /** + * Sets the credentials provider type and the credentials to hadoop based on the provided configuration + * + * @param jobConf hadoop job config + * @param configuration external details configuration + * @throws CompilationException CompilationException + */ + private static void setHadoopCredentials(JobConf jobConf, Map<String, String> configuration) + throws CompilationException { + AuthenticationType authenticationType = getAuthenticationType(configuration); + switch (authenticationType) { + case ANONYMOUS: + jobConf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED); + break; + case IMPERSONATE_SERVICE_ACCOUNT: + String impersonateServiceAccount = configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME); + jobConf.set(IMPERSONATE_SERVICE_ACCOUNT, impersonateServiceAccount); + setJsonCredentials(jobConf, configuration); + break; + case SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS: + setJsonCredentials(jobConf, configuration); + break; + case BAD_AUTHENTICATION: + throw new CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED); + } + } + + /** + * Sets the Json credentials to hadoop job configuration + * Note: + * Setting these values instead of HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported + * in com.google.cloud.bigdataoss:util-hadoop only up to version hadoop3-2.2.x and is removed in + * version 3.x.y, which also removed support for hadoop-2 + * + * @param jobConf hadoop job config + * @param configuration external details configuration + * @throws CompilationException CompilationException + */ + private static void setJsonCredentials(JobConf jobConf, Map<String, String> configuration) + throws CompilationException { + try { + String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME); + JsonNode jsonCreds = JSON_CREDENTIALS_OBJECT_MAPPER.readTree(jsonCredentials); + jobConf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID, + jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText()); + jobConf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY, + jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText()); + jobConf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL, + jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText()); + } catch (JsonProcessingException e) { + throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e, "Unable to parse Json Credentials", + getMessageOrToString(e)); + } + } + + private static String getNonNull(Map<String, String> configuration, String... fieldNames) { + for (String fieldName : fieldNames) { + if (configuration.get(fieldName) != null) { + return fieldName; + } + } + return null; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java index 739dbde549..19fd74fadb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java @@ -40,23 +40,21 @@ public class GCSConstants { } public static final String APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME = "applicationDefaultCredentials"; + public static final String IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME = "impersonateServiceAccount"; public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials"; public static final String ENDPOINT_FIELD_NAME = "endpoint"; public static final String STORAGE_PREFIX = "prefix"; - /* - * Hadoop internal configuration - */ + // hadoop internal configuration public static final String HADOOP_GCS_PROTOCOL = "gs"; + public static final String MAX_BATCH_THREADS = "fs.gs.batch.threads"; + public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url"; + public static final String HADOOP_SUPPORT_COMPRESSED = "fs.gs.inputstream.support.gzip.encoding.enable"; // hadoop credentials public static final String HADOOP_AUTH_TYPE = "fs.gs.auth.type"; public static final String HADOOP_AUTH_UNAUTHENTICATED = "UNAUTHENTICATED"; - // gs hadoop parameters - public static final String HADOOP_SUPPORT_COMPRESSED = "fs.gs.inputstream.support.gzip.encoding.enable"; - public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url"; - public static class JsonCredentials { public static final String PRIVATE_KEY_ID = "private_key_id"; public static final String PRIVATE_KEY = "private_key"; @@ -64,6 +62,7 @@ public class GCSConstants { } public static class HadoopAuthServiceAccount { + public static final String IMPERSONATE_SERVICE_ACCOUNT = "fs.gs.auth.impersonation.service.account"; public static final String PRIVATE_KEY_ID = "fs.gs.auth.service.account.private.key.id"; public static final String PRIVATE_KEY = "fs.gs.auth.service.account.private.key"; public static final String CLIENT_EMAIL = "fs.gs.auth.service.account.email"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java index d768c230ce..032f537bc3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java @@ -18,32 +18,20 @@ */ package org.apache.asterix.external.util.google.gcs; -import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; -import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE; -import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT; import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties; import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; -import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME; -import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY; -import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME; -import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE; -import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED; -import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT; -import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_GCS_PROTOCOL; -import static org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME; +import static org.apache.asterix.external.util.google.gcs.GCSAuthUtils.buildClient; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.BiPredicate; import java.util.regex.Matcher; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.external.IExternalFilterEvaluator; @@ -51,25 +39,15 @@ import org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataPrefix; import org.apache.asterix.external.util.ExternalDataUtils; -import org.apache.asterix.external.util.HDFSUtils; -import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.exceptions.Warning; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.json.JsonReadFeature; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.gax.paging.Page; -import com.google.auth.oauth2.GoogleCredentials; -import com.google.cloud.BaseServiceException; -import com.google.cloud.NoCredentials; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Storage; -import com.google.cloud.storage.StorageOptions; public class GCSUtils { private GCSUtils() { @@ -77,73 +55,17 @@ public class GCSUtils { } - private static final ObjectMapper JSON_CREDENTIALS_OBJECT_MAPPER = new ObjectMapper(); - static { - JSON_CREDENTIALS_OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true); - } - - /** - * Builds the client using the provided configuration - * - * @param configuration properties - * @return clientasterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java - * @throws CompilationException CompilationException - */ - public static Storage buildClient(Map<String, String> configuration) throws CompilationException { - String applicationDefaultCredentials = configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME); - String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME); - String endpoint = configuration.get(ENDPOINT_FIELD_NAME); - - StorageOptions.Builder builder = StorageOptions.newBuilder(); - builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY); - - // default credentials provider - if (applicationDefaultCredentials != null) { - // only "true" value is allowed - if (!applicationDefaultCredentials.equalsIgnoreCase("true")) { - throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, - APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME, "true"); - } - - // no other authentication parameters are allowed - if (jsonCredentials != null) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, JSON_CREDENTIALS_FIELD_NAME, - APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME); - } - - try { - builder.setCredentials(GoogleCredentials.getApplicationDefault()); - } catch (Exception ex) { - throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); - } - } else if (jsonCredentials != null) { - try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) { - builder.setCredentials(GoogleCredentials.fromStream(credentialsStream)); - } catch (IOException ex) { - throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); - } catch (Exception ex) { - throw new CompilationException(EXTERNAL_SOURCE_ERROR, - "Encountered an issue while processing the JSON credentials. Please ensure the provided credentials are valid."); - } - } else { - builder.setCredentials(NoCredentials.getInstance()); - } - - if (endpoint != null) { - builder.setHost(endpoint); - } - - return builder.build().getService(); - } - /** * Validate external dataset properties * + * @param appCtx application context * @param configuration properties + * @param srcLoc source location + * @param collector warning collector * @throws CompilationException Compilation exception */ - public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc, - IWarningCollector collector) throws CompilationException { + public static void validateProperties(IApplicationContext appCtx, Map<String, String> configuration, + SourceLocation srcLoc, IWarningCollector collector) throws CompilationException { if (isDeltaTable(configuration)) { validateDeltaTableProperties(configuration); } @@ -154,7 +76,6 @@ public class GCSUtils { validateIncludeExclude(configuration); try { - // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation new ExternalDataPrefix(configuration); } catch (AlgebricksException ex) { throw new CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex); @@ -162,36 +83,33 @@ public class GCSUtils { String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - try { + try (Storage storage = buildClient(appCtx, configuration)) { Storage.BlobListOption limitOption = Storage.BlobListOption.pageSize(1); Storage.BlobListOption prefixOption = Storage.BlobListOption.prefix(getPrefix(configuration)); - Storage storage = buildClient(configuration); Page<Blob> items = storage.list(container, limitOption, prefixOption); if (!items.iterateAll().iterator().hasNext() && collector.shouldWarn()) { Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); collector.warn(warning); } - } catch (CompilationException ex) { - throw ex; } catch (Exception ex) { throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); } } - public static List<Blob> listItems(Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher, - IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix, - IExternalFilterEvaluator evaluator) throws CompilationException, HyracksDataException { + public static List<Blob> listItems(IApplicationContext appCtx, Map<String, String> configuration, + IncludeExcludeMatcher includeExcludeMatcher, IWarningCollector warningCollector, + ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) + throws CompilationException, HyracksDataException { // Prepare to retrieve the objects List<Blob> filesOnly = new ArrayList<>(); String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - Storage gcs = buildClient(configuration); Storage.BlobListOption options = Storage.BlobListOption.prefix(ExternalDataUtils.getPrefix(configuration)); Page<Blob> items; - try { + try (Storage gcs = buildClient(appCtx, configuration)) { items = gcs.list(container, options); - } catch (BaseServiceException ex) { + } catch (Exception ex) { throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); } @@ -224,61 +142,6 @@ public class GCSUtils { } } - public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration) - throws AlgebricksException { - configureHdfsJobConf(conf, configuration, 0); - } - - /** - * Builds the client using the provided configuration - * - * @param configuration properties - * @param numberOfPartitions number of partitions in the cluster - */ - public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration, int numberOfPartitions) - throws AlgebricksException { - String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME); - String endpoint = configuration.get(ENDPOINT_FIELD_NAME); - - // disable caching FileSystem - HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_GCS_PROTOCOL); - - // TODO(htowaileb): needs further testing, recommended to disable by gcs-hadoop team - conf.set(GCSConstants.HADOOP_SUPPORT_COMPRESSED, ExternalDataConstants.FALSE); - - // TODO(htowaileb): needs further testing - // set number of threads - // conf.set(GCSConstants.HADOOP_MAX_REQUESTS_PER_BATCH, String.valueOf(numberOfPartitions)); - // conf.set(GCSConstants.HADOOP_BATCH_THREADS, String.valueOf(numberOfPartitions)); - - // authentication method - if (jsonCredentials == null) { - // anonymous access - conf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED); - } else { - try { - JsonNode jsonCreds = JSON_CREDENTIALS_OBJECT_MAPPER.readTree(jsonCredentials); - // Setting these values instead of HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported - // in com.google.cloud.bigdataoss:util-hadoop only up to version hadoop3-2.2.x and is removed in - // version 3.x.y, which also removed support for hadoop-2 - conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID, - jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText()); - conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY, - jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText()); - conf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL, - jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText()); - } catch (JsonProcessingException e) { - throw CompilationException.create(EXTERNAL_SOURCE_ERROR, "Unable to parse Json Credentials", - getMessageOrToString(e)); - } - } - - // set endpoint if provided, default is https://storage.googleapis.com/ - if (endpoint != null) { - conf.set(HADOOP_ENDPOINT, endpoint); - } - } - public static String getPath(Map<String, String> configuration) { return GCSConstants.HADOOP_GCS_PROTOCOL + "://" + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
