This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit ba961ddf07b7ffef21f1890cb5374235199cf2d2
Author: Hussain Towaileb <[email protected]>
AuthorDate: Tue Feb 4 16:33:11 2025 +0300

    [ASTERIXDB-3565][EXT]: Add impersonate service account auth for GCS
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Add support to impersonating a service account as
    an authentication method for GCS links. This works
    by providing a target service account to impersonate
    and provide source credentials (ours) to use to
    impersonate the account. There is no need to store
    any temporary credentials/tokens as the SDK automatically
    picks up the token generated, and if expired, automatically
    refresh it for subsequent requests.
    
    Ext-ref: MB-65121
    Change-Id: Ie1d69faa45a03550c8e0fe66eb18c2ae53a8454a
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19444
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Reviewed-by: Hussain Towaileb <[email protected]>
---
 .../external/ExternalCredentialsCacheUpdater.java  |   9 +-
 .../api/cluster_state_1/cluster_state_1.1.regexadm |   1 +
 .../cluster_state_1_full.1.regexadm                |   1 +
 .../cluster_state_1_less.1.regexadm                |   1 +
 .../cloud/writer/GCSExternalFileWriterFactory.java |   4 +-
 .../asterix/common/config/ExternalProperties.java  |  12 +-
 .../asterix/common/exceptions/ErrorCode.java       |   2 +
 .../src/main/resources/asx_errormsg/en.properties  |   2 +
 .../input/record/reader/gcs/GCSInputStream.java    |   9 +-
 .../record/reader/gcs/GCSInputStreamFactory.java   |   8 +-
 .../reader/gcs/delta/GCSDeltaReaderFactory.java    |   4 +-
 .../gcs/parquet/GCSParquetReaderFactory.java       |   7 +-
 .../asterix/external/util/ExternalDataUtils.java   |   4 +-
 .../asterix/external/util/aws/s3/S3AuthUtils.java  |  74 ++----
 .../external/util/google/gcs/GCSAuthUtils.java     | 294 +++++++++++++++++++++
 .../external/util/google/gcs/GCSConstants.java     |  13 +-
 .../asterix/external/util/google/gcs/GCSUtils.java | 165 +-----------
 17 files changed, 390 insertions(+), 220 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
index 4c382d0d03..10f536bb71 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -78,14 +78,15 @@ public class ExternalCredentialsCacheUpdater implements 
IExternalCredentialsCach
 
         String type = 
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
         if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(type)) {
-            credentials = generateAwsCredentials(configuration);
+            return generateAwsCredentials(configuration);
+        } else {
+            // this should never happen
+            throw new IllegalArgumentException("Unsupported external source 
type: " + type);
         }
-
-        return credentials;
     }
 
     // TODO: this can probably be refactored out into something that is 
AWS-specific
-    private Object generateAwsCredentials(Map<String, String> configuration)
+    private AwsSessionCredentials generateAwsCredentials(Map<String, String> 
configuration)
             throws HyracksDataException, CompilationException {
         String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
         AwsSessionCredentials credentials;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 3ffb080f4a..c6d7ba9de5 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -61,6 +61,7 @@
     "compiler\.textsearchmemory" : 163840,
     "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
+    "gcp.impersonate.service.account.duration" : 900,
     "library\.deploy\.timeout" : 1800,
     "log\.dir" : "logs/",
     "log\.level" : "DEBUG",
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index aeb3cea25b..562e195afc 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -61,6 +61,7 @@
     "compiler\.textsearchmemory" : 163840,
     "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
+    "gcp.impersonate.service.account.duration" : 900,
     "library\.deploy\.timeout" : 1800,
     "log\.dir" : "logs/",
     "log\.level" : "WARN",
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index b475612291..132fa5b281 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -61,6 +61,7 @@
     "compiler\.textsearchmemory" : 163840,
     "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
+    "gcp.impersonate.service.account.duration" : 900,
     "library\.deploy\.timeout" : 1800,
     "log\.dir" : "logs/",
     "log\.level" : "WARN",
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 3a34365fac..02e28810c8 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -27,7 +27,7 @@ import 
org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
@@ -66,7 +66,7 @@ public final class GCSExternalFileWriterFactory extends 
AbstractCloudExternalFil
     @Override
     ICloudClient createCloudClient(IApplicationContext appCtx) throws 
CompilationException {
         GCSClientConfig config = GCSClientConfig.of(configuration, 
writeBufferSize);
-        return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
+        return new GCSCloudClient(config, GCSAuthUtils.buildClient(appCtx, 
configuration),
                 ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index 62437b20bb..d233606fa6 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -64,7 +64,12 @@ public class ExternalProperties extends AbstractProperties {
                 75,
                 "Percentage of duration passed before assume role credentials 
need to be refreshed, the value ranges "
                         + "from 25 to 90, default is 75. For example, if the 
value is set to 65, this means the "
-                        + "credentials need to be refreshed if 65% of the 
total expiration duration is already passed");
+                        + "credentials need to be refreshed if 65% of the 
total expiration duration is already passed"),
+        GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION(
+                getRangedIntegerType(60, 3600),
+                900,
+                "GCS impersonating service account duration in seconds. "
+                        + "Range from 60 seconds (1 min) to 3600 seconds (1 
hour)");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -94,6 +99,7 @@ public class ExternalProperties extends AbstractProperties {
                 case AZURE_REQUEST_TIMEOUT:
                 case AWS_ASSUME_ROLE_DURATION:
                 case AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE:
+                case GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION:
                     return Section.COMMON;
                 case CC_JAVA_OPTS:
                 case NC_JAVA_OPTS:
@@ -182,4 +188,8 @@ public class ExternalProperties extends AbstractProperties {
     public int getAwsRefreshAssumeRoleThresholdPercentage() {
         return 
accessor.getInt(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE);
     }
+
+    public int getGcpImpersonateServiceAccountDuration() {
+        return 
accessor.getInt(Option.GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION);
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 46d63bb072..463695e040 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -326,6 +326,8 @@ public enum ErrorCode implements IError {
     CSV_INVALID_FORCE_QUOTE(1218),
     CSV_INVALID_ESCAPE(1219),
     CANNOT_TRUNCATE_DATASET_TYPE(1220),
+    NO_VALID_AUTHENTICATION_PARAMS_PROVIDED(1221),
+    
NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT(1222),
 
     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index d6a171ff78..df08016e31 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -328,6 +328,8 @@
 1218 = '%1$s' is not a valid force-quote input. The length of a force-quote 
input should be 1 character
 1219 = '%1$s' is not a valid escape. The length of a escape should be 1
 1220 = Cannot truncate %1$s '%2$s'
+1221 = No valid authentication parameters were provided
+1222 = No valid authentication parameters were provided to impersonate service 
account
 
 # Feed Errors
 3001 = Illegal state.
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
index 89da06512f..1ef3fcde9c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
@@ -27,13 +27,14 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
@@ -46,13 +47,15 @@ import com.google.cloud.storage.Storage;
 
 public class GCSInputStream extends AbstractExternalInputStream {
 
+    private final IApplicationContext ncAppCtx;
     private final Storage client;
     private final String container;
     private static final int MAX_ATTEMPTS = 5; // We try a total of 5 times in 
case of retryable errors
 
-    public GCSInputStream(Map<String, String> configuration, List<String> 
filePaths,
+    public GCSInputStream(IApplicationContext ncAppCtx, Map<String, String> 
configuration, List<String> filePaths,
             IExternalFilterValueEmbedder valueEmbedder) throws 
HyracksDataException {
         super(configuration, filePaths, valueEmbedder);
+        this.ncAppCtx = ncAppCtx;
         this.client = buildClient(configuration);
         this.container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
@@ -136,7 +139,7 @@ public class GCSInputStream extends 
AbstractExternalInputStream {
 
     private Storage buildClient(Map<String, String> configuration) throws 
HyracksDataException {
         try {
-            return GCSUtils.buildClient(configuration);
+            return GCSAuthUtils.buildClient(ncAppCtx, configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
index b6ad3cdea1..2de55a0857 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
@@ -47,7 +48,9 @@ public class GCSInputStreamFactory extends 
AbstractExternalInputStreamFactory {
     public AsterixInputStream createInputStream(IExternalDataRuntimeContext 
context) throws HyracksDataException {
         IExternalFilterValueEmbedder valueEmbedder = 
context.getValueEmbedder();
         int partition = context.getPartition();
-        return new GCSInputStream(configuration, 
partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
+        IApplicationContext ncAppCtx = (IApplicationContext) 
context.getTaskContext().getJobletContext()
+                .getServiceContext().getApplicationContext();
+        return new GCSInputStream(ncAppCtx, configuration, 
partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
                 valueEmbedder);
     }
 
@@ -65,7 +68,8 @@ public class GCSInputStreamFactory extends 
AbstractExternalInputStreamFactory {
         configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, 
externalDataPrefix.getRoot());
 
         // get the items
-        List<Blob> filesOnly = GCSUtils.listItems(configuration, 
includeExcludeMatcher, warningCollector,
+        IApplicationContext appCtx = (IApplicationContext) 
ctx.getApplicationContext();
+        List<Blob> filesOnly = GCSUtils.listItems(appCtx, configuration, 
includeExcludeMatcher, warningCollector,
                 externalDataPrefix, evaluator);
 
         // Distribute work load amongst the partitions
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
index db7673caad..301bfc4fa8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.asterix.common.api.IApplicationContext;
 import 
org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
 import org.apache.asterix.external.util.google.gcs.GCSUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -37,7 +38,8 @@ public class GCSDeltaReaderFactory extends DeltaReaderFactory 
{
     @Override
     protected void configureJobConf(IApplicationContext appCtx, JobConf conf, 
Map<String, String> configuration)
             throws AlgebricksException {
-        GCSUtils.configureHdfsJobConf(conf, configuration);
+        int numberOfPartitions = 
getPartitionConstraint().getLocations().length;
+        GCSAuthUtils.configureHdfsJobConf(conf, configuration, 
numberOfPartitions);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 17cad3e2a0..874c3bd78f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
@@ -30,6 +31,7 @@ import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
 import org.apache.asterix.external.util.google.gcs.GCSConstants;
 import org.apache.asterix.external.util.google.gcs.GCSUtils;
 import org.apache.hadoop.mapred.JobConf;
@@ -59,7 +61,8 @@ public class GCSParquetReaderFactory extends 
HDFSDataSourceFactory {
         configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, 
externalDataPrefix.getRoot());
 
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        List<Blob> filesOnly = GCSUtils.listItems(configuration, 
includeExcludeMatcher, warningCollector,
+        IApplicationContext appCtx = (IApplicationContext) 
serviceCtx.getApplicationContext();
+        List<Blob> filesOnly = GCSUtils.listItems(appCtx, configuration, 
includeExcludeMatcher, warningCollector,
                 externalDataPrefix, evaluator);
 
         // get path
@@ -71,7 +74,7 @@ public class GCSParquetReaderFactory extends 
HDFSDataSourceFactory {
         // configure hadoop input splits
         JobConf conf = prepareHDFSConf(serviceCtx, configuration, 
filterEvaluatorFactory);
         int numberOfPartitions = 
getPartitionConstraint().getLocations().length;
-        GCSUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
+        GCSAuthUtils.configureHdfsJobConf(conf, configuration, 
numberOfPartitions);
         configureHdfsConf(conf, configuration);
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 0fed43f7e3..591829bf3d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -35,7 +35,7 @@ import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_
 import static 
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
 import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
 import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
-import static 
org.apache.asterix.external.util.google.gcs.GCSUtils.configureHdfsJobConf;
+import static 
org.apache.asterix.external.util.google.gcs.GCSAuthUtils.configureHdfsJobConf;
 import static 
org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
@@ -727,7 +727,7 @@ public class ExternalDataUtils {
                 validateAzureDataLakeProperties(configuration, srcLoc, 
collector, appCtx);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_GCS:
-                validateProperties(configuration, srcLoc, collector);
+                validateProperties(appCtx, configuration, srcLoc, collector);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_HDFS:
                 HDFSUtils.validateProperties(configuration, srcLoc, collector);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index f6ca480dcb..035415d9c9 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -105,7 +105,7 @@ import 
software.amazon.awssdk.services.sts.model.Credentials;
 public class S3AuthUtils {
     enum AuthenticationType {
         ANONYMOUS,
-        ARN,
+        ARN_ASSUME_ROLE,
         INSTANCE_PROFILE,
         ACCESS_KEYS,
         BAD_AUTHENTICATION
@@ -120,7 +120,8 @@ public class S3AuthUtils {
     }
 
     public static boolean isArnAssumedRoleExpiredToken(Map<String, String> 
configuration, String errorCode) {
-        return ERROR_EXPIRED_TOKEN.equals(errorCode) && 
getAuthenticationType(configuration) == AuthenticationType.ARN;
+        return ERROR_EXPIRED_TOKEN.equals(errorCode)
+                && getAuthenticationType(configuration) == 
AuthenticationType.ARN_ASSUME_ROLE;
     }
 
     /**
@@ -168,7 +169,7 @@ public class S3AuthUtils {
         switch (authenticationType) {
             case ANONYMOUS:
                 return AnonymousCredentialsProvider.create();
-            case ARN:
+            case ARN_ASSUME_ROLE:
                 return getTrustAccountCredentials(appCtx, configuration);
             case INSTANCE_PROFILE:
                 return getInstanceProfileCredentials(configuration);
@@ -206,7 +207,7 @@ public class S3AuthUtils {
         if (noAuth(configuration)) {
             return AuthenticationType.ANONYMOUS;
         } else if (roleArn != null) {
-            return AuthenticationType.ARN;
+            return AuthenticationType.ARN_ASSUME_ROLE;
         } else if (instanceProfile != null) {
             return AuthenticationType.INSTANCE_PROFILE;
         } else if (accessKeyId != null || secretAccessKey != null) {
@@ -232,11 +233,11 @@ public class S3AuthUtils {
     }
 
     /**
-     * Returns the cached credentials if valid, otherwise, generates new 
credentials by assume a role
+     * Returns the cached credentials if valid, otherwise, generates new 
credentials
      *
      * @param appCtx application context
      * @param configuration configuration
-     * @return returns the cached credentials if valid, otherwise, generates 
new credentials by assume a role
+     * @return returns the cached credentials if valid, otherwise, generates 
new credentials
      * @throws CompilationException CompilationException
      */
     public static AwsCredentialsProvider 
getTrustAccountCredentials(IApplicationContext appCtx,
@@ -278,20 +279,8 @@ public class S3AuthUtils {
         if (externalId != null) {
             builder.externalId(externalId);
         }
-
-        // credentials to be used to assume the role
-        AwsCredentialsProvider credentialsProvider;
         AssumeRoleRequest request = builder.build();
-        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        if ("true".equalsIgnoreCase(instanceProfile)) {
-            credentialsProvider = getInstanceProfileCredentials(configuration, 
true);
-        } else if (accessKeyId != null && secretAccessKey != null) {
-            credentialsProvider = getAccessKeyCredentials(configuration, true);
-        } else {
-            throw new 
CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
-        }
+        AwsCredentialsProvider credentialsProvider = 
getCredentialsToAssumeRole(configuration);
 
         // assume the role from the provided arn
         try (StsClient stsClient =
@@ -305,13 +294,22 @@ public class S3AuthUtils {
         }
     }
 
-    private static AwsCredentialsProvider 
getInstanceProfileCredentials(Map<String, String> configuration)
+    private static AwsCredentialsProvider 
getCredentialsToAssumeRole(Map<String, String> configuration)
             throws CompilationException {
-        return getInstanceProfileCredentials(configuration, false);
+        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        if (instanceProfile != null) {
+            return getInstanceProfileCredentials(configuration);
+        } else if (accessKeyId != null || secretAccessKey != null) {
+            return getAccessKeyCredentials(configuration);
+        } else {
+            throw new 
CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
+        }
     }
 
-    private static AwsCredentialsProvider 
getInstanceProfileCredentials(Map<String, String> configuration,
-            boolean assumeRoleAuthentication) throws CompilationException {
+    private static AwsCredentialsProvider 
getInstanceProfileCredentials(Map<String, String> configuration)
+            throws CompilationException {
         String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
 
         // only "true" value is allowed
@@ -319,24 +317,17 @@ public class S3AuthUtils {
             throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
INSTANCE_PROFILE_FIELD_NAME, "true");
         }
 
-        if (!assumeRoleAuthentication) {
-            String notAllowed = getNonNull(configuration, 
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                    SESSION_TOKEN_FIELD_NAME);
-            if (notAllowed != null) {
-                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                        INSTANCE_PROFILE_FIELD_NAME);
-            }
+        String notAllowed = getNonNull(configuration, 
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
+                SESSION_TOKEN_FIELD_NAME);
+        if (notAllowed != null) {
+            throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+                    INSTANCE_PROFILE_FIELD_NAME);
         }
         return InstanceProfileCredentialsProvider.create();
     }
 
     private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, 
String> configuration)
             throws CompilationException {
-        return getAccessKeyCredentials(configuration, false);
-    }
-
-    private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, 
String> configuration,
-            boolean assumeRoleAuthentication) throws CompilationException {
         String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
         String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
         String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
@@ -350,14 +341,6 @@ public class S3AuthUtils {
                     ACCESS_KEY_ID_FIELD_NAME);
         }
 
-        if (!assumeRoleAuthentication) {
-            String notAllowed = getNonNull(configuration, 
INSTANCE_PROFILE_FIELD_NAME, EXTERNAL_ID_FIELD_NAME);
-            if (notAllowed != null) {
-                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                        INSTANCE_PROFILE_FIELD_NAME);
-            }
-        }
-
         // use session token if provided
         if (sessionToken != null) {
             return StaticCredentialsProvider
@@ -423,13 +406,13 @@ public class S3AuthUtils {
      * @param configuration external details configuration
      */
     private static void setHadoopCredentials(IApplicationContext appCtx, 
JobConf jobConf,
-            Map<String, String> configuration) {
+            Map<String, String> configuration) throws CompilationException {
         AuthenticationType authenticationType = 
getAuthenticationType(configuration);
         switch (authenticationType) {
             case ANONYMOUS:
                 jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS);
                 break;
-            case ARN:
+            case ARN_ASSUME_ROLE:
                 jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, 
HADOOP_ASSUMED_ROLE);
                 jobConf.set(HADOOP_ASSUME_ROLE_ARN, 
configuration.get(ROLE_ARN_FIELD_NAME));
                 jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, 
configuration.get(EXTERNAL_ID_FIELD_NAME));
@@ -458,6 +441,7 @@ public class S3AuthUtils {
                 }
                 break;
             case BAD_AUTHENTICATION:
+                throw new 
CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
         }
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java
new file mode 100644
index 0000000000..4473ad759a
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.google.gcs;
+
+import static 
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
+import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
+import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
+import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
+import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT;
+import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_GCS_PROTOCOL;
+import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME;
+import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME;
+import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HadoopAuthServiceAccount.IMPERSONATE_SERVICE_ACCOUNT;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.json.JsonReadFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ImpersonatedCredentials;
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+
+public class GCSAuthUtils {
+    enum AuthenticationType {
+        ANONYMOUS,
+        IMPERSONATE_SERVICE_ACCOUNT,
+        APPLICATION_DEFAULT_CREDENTIALS,
+        SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS,
+        BAD_AUTHENTICATION
+    }
+
+    private static final ObjectMapper JSON_CREDENTIALS_OBJECT_MAPPER = new 
ObjectMapper();
+    private static final List<String> READ_WRITE_SCOPE_PERMISSION =
+            
Collections.singletonList("https://www.googleapis.com/auth/devstorage.read_write";);
+    static {
+        
JSON_CREDENTIALS_OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
 true);
+    }
+
+    private GCSAuthUtils() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    /**
+     * Builds the client using the provided configuration
+     *
+     * @param appCtx application context
+     * @param configuration properties
+     * @return Storage client
+     * @throws CompilationException CompilationException
+     */
+    public static Storage buildClient(IApplicationContext appCtx, Map<String, 
String> configuration)
+            throws CompilationException {
+        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+
+        Credentials credentials = buildCredentials(appCtx, configuration);
+        StorageOptions.Builder builder = StorageOptions.newBuilder();
+        builder.setCredentials(credentials);
+
+        if (endpoint != null) {
+            builder.setHost(endpoint);
+        }
+
+        return builder.build().getService();
+    }
+
+    public static Credentials buildCredentials(IApplicationContext appCtx, 
Map<String, String> configuration) throws CompilationException {
+        AuthenticationType authenticationType = 
getAuthenticationType(configuration);
+        return switch (authenticationType) {
+            case ANONYMOUS -> NoCredentials.getInstance();
+            case IMPERSONATE_SERVICE_ACCOUNT -> 
getImpersonatedServiceAccountCredentials(appCtx, configuration);
+            case APPLICATION_DEFAULT_CREDENTIALS -> 
getApplicationDefaultCredentials(configuration);
+            case SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS -> 
getServiceAccountKeyCredentials(configuration);
+            case BAD_AUTHENTICATION -> throw new 
CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
+        };
+    }
+
+    private static AuthenticationType getAuthenticationType(Map<String, 
String> configuration) {
+        String impersonateServiceAccount = 
configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
+        String applicationDefaultCredentials = 
configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
+        String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+
+        if (noAuth(configuration)) {
+            return AuthenticationType.ANONYMOUS;
+        } else if (impersonateServiceAccount != null) {
+            return AuthenticationType.IMPERSONATE_SERVICE_ACCOUNT;
+        } else if (applicationDefaultCredentials != null) {
+            return AuthenticationType.APPLICATION_DEFAULT_CREDENTIALS;
+        } else if (jsonCredentials != null) {
+            return AuthenticationType.SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS;
+        } else {
+            return AuthenticationType.BAD_AUTHENTICATION;
+        }
+    }
+
+    private static boolean noAuth(Map<String, String> configuration) {
+        return getNonNull(configuration, 
APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME, JSON_CREDENTIALS_FIELD_NAME,
+                IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME) == null;
+    }
+
+    /**
+     * Returns the cached credentials if valid, otherwise, generates new 
credentials
+     *
+     * @param appCtx application context
+     * @param configuration configuration
+     * @return returns the cached credentials if valid, otherwise, generates 
new credentials
+     * @throws CompilationException CompilationException
+     */
+    public static GoogleCredentials 
getImpersonatedServiceAccountCredentials(IApplicationContext appCtx,
+            Map<String, String> configuration) throws CompilationException {
+        GoogleCredentials sourceCredentials = 
getCredentialsToImpersonateServiceAccount(configuration);
+        String impersonateServiceAccount = 
configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
+        int duration = 
appCtx.getExternalProperties().getGcpImpersonateServiceAccountDuration();
+
+        // Create impersonated credentials
+        return ImpersonatedCredentials.create(sourceCredentials, 
impersonateServiceAccount, null,
+                READ_WRITE_SCOPE_PERMISSION, duration);
+    }
+
+    private static GoogleCredentials 
getCredentialsToImpersonateServiceAccount(Map<String, String> configuration)
+            throws CompilationException {
+        String applicationDefaultCredentials = 
configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
+        String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+
+        if (applicationDefaultCredentials != null) {
+            return getApplicationDefaultCredentials(configuration);
+        } else if (jsonCredentials != null) {
+            return getServiceAccountKeyCredentials(configuration);
+        } else {
+            throw new CompilationException(
+                    
ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT);
+        }
+    }
+
+    private static GoogleCredentials 
getApplicationDefaultCredentials(Map<String, String> configuration)
+            throws CompilationException {
+        try {
+            String notAllowed = getNonNull(configuration, 
JSON_CREDENTIALS_FIELD_NAME);
+            if (notAllowed != null) {
+                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+                        APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
+            }
+            return GoogleCredentials.getApplicationDefault();
+        } catch (Exception ex) {
+            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
+        }
+    }
+
+    private static GoogleCredentials 
getServiceAccountKeyCredentials(Map<String, String> configuration)
+            throws CompilationException {
+        String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+        try (InputStream credentialsStream = new 
ByteArrayInputStream(jsonCredentials.getBytes())) {
+            return GoogleCredentials.fromStream(credentialsStream);
+        } catch (IOException ex) {
+            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
+        } catch (Exception ex) {
+            throw new CompilationException(EXTERNAL_SOURCE_ERROR,
+                    "Encountered an issue while processing the JSON 
credentials. Please ensure the provided credentials are valid.");
+        }
+    }
+
+    public static void configureHdfsJobConf(JobConf jobConf, Map<String, 
String> configuration)
+            throws AlgebricksException {
+        configureHdfsJobConf(jobConf, configuration, 0);
+    }
+
+    /**
+     * Builds the client using the provided configuration
+     *
+     * @param configuration      properties
+     * @param numberOfPartitions number of partitions
+     */
+    public static void configureHdfsJobConf(JobConf jobConf, Map<String, 
String> configuration, int numberOfPartitions)
+            throws AlgebricksException {
+        setHadoopCredentials(jobConf, configuration);
+
+        // set endpoint if provided, default is https://storage.googleapis.com/
+        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+        if (endpoint != null) {
+            jobConf.set(HADOOP_ENDPOINT, endpoint);
+        }
+
+        // disable caching FileSystem
+        HDFSUtils.disableHadoopFileSystemCache(jobConf, HADOOP_GCS_PROTOCOL);
+
+        // TODO(htowaileb): make configurable, in case we hit rate limits then 
we can reduce it, default is 15
+        if (numberOfPartitions != 0) {
+            jobConf.set(GCSConstants.MAX_BATCH_THREADS, 
String.valueOf(numberOfPartitions));
+        }
+
+        // recommended to be disabled by GCP hadoop team
+        jobConf.set(GCSConstants.HADOOP_SUPPORT_COMPRESSED, 
ExternalDataConstants.FALSE);
+    }
+
+    /**
+     * Sets the credentials provider type and the credentials to hadoop based 
on the provided configuration
+     *
+     * @param jobConf hadoop job config
+     * @param configuration external details configuration
+     * @throws CompilationException CompilationException
+     */
+    private static void setHadoopCredentials(JobConf jobConf, Map<String, 
String> configuration)
+            throws CompilationException {
+        AuthenticationType authenticationType = 
getAuthenticationType(configuration);
+        switch (authenticationType) {
+            case ANONYMOUS:
+                jobConf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED);
+                break;
+            case IMPERSONATE_SERVICE_ACCOUNT:
+                String impersonateServiceAccount = 
configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
+                jobConf.set(IMPERSONATE_SERVICE_ACCOUNT, 
impersonateServiceAccount);
+                setJsonCredentials(jobConf, configuration);
+                break;
+            case SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS:
+                setJsonCredentials(jobConf, configuration);
+                break;
+            case BAD_AUTHENTICATION:
+                throw new 
CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
+        }
+    }
+
+    /**
+     * Sets the Json credentials to hadoop job configuration
+     * Note:
+     * Setting these values instead of 
HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported
+     * in com.google.cloud.bigdataoss:util-hadoop only up to version 
hadoop3-2.2.x and is removed in
+     * version 3.x.y, which also removed support for hadoop-2
+     *
+     * @param jobConf hadoop job config
+     * @param configuration external details configuration
+     * @throws CompilationException CompilationException
+     */
+    private static void setJsonCredentials(JobConf jobConf, Map<String, 
String> configuration)
+            throws CompilationException {
+        try {
+            String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+            JsonNode jsonCreds = 
JSON_CREDENTIALS_OBJECT_MAPPER.readTree(jsonCredentials);
+            jobConf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID,
+                    
jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText());
+            jobConf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY,
+                    
jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText());
+            jobConf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL,
+                    
jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText());
+        } catch (JsonProcessingException e) {
+            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e, 
"Unable to parse Json Credentials",
+                    getMessageOrToString(e));
+        }
+    }
+
+    private static String getNonNull(Map<String, String> configuration, 
String... fieldNames) {
+        for (String fieldName : fieldNames) {
+            if (configuration.get(fieldName) != null) {
+                return fieldName;
+            }
+        }
+        return null;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
index 739dbde549..19fd74fadb 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -40,23 +40,21 @@ public class GCSConstants {
     }
 
     public static final String APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME = 
"applicationDefaultCredentials";
+    public static final String IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME = 
"impersonateServiceAccount";
     public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials";
     public static final String ENDPOINT_FIELD_NAME = "endpoint";
     public static final String STORAGE_PREFIX = "prefix";
 
-    /*
-     * Hadoop internal configuration
-     */
+    // hadoop internal configuration
     public static final String HADOOP_GCS_PROTOCOL = "gs";
+    public static final String MAX_BATCH_THREADS = "fs.gs.batch.threads";
+    public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url";
+    public static final String HADOOP_SUPPORT_COMPRESSED = 
"fs.gs.inputstream.support.gzip.encoding.enable";
 
     // hadoop credentials
     public static final String HADOOP_AUTH_TYPE = "fs.gs.auth.type";
     public static final String HADOOP_AUTH_UNAUTHENTICATED = "UNAUTHENTICATED";
 
-    // gs hadoop parameters
-    public static final String HADOOP_SUPPORT_COMPRESSED = 
"fs.gs.inputstream.support.gzip.encoding.enable";
-    public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url";
-
     public static class JsonCredentials {
         public static final String PRIVATE_KEY_ID = "private_key_id";
         public static final String PRIVATE_KEY = "private_key";
@@ -64,6 +62,7 @@ public class GCSConstants {
     }
 
     public static class HadoopAuthServiceAccount {
+        public static final String IMPERSONATE_SERVICE_ACCOUNT = 
"fs.gs.auth.impersonation.service.account";
         public static final String PRIVATE_KEY_ID = 
"fs.gs.auth.service.account.private.key.id";
         public static final String PRIVATE_KEY = 
"fs.gs.auth.service.account.private.key";
         public static final String CLIENT_EMAIL = 
"fs.gs.auth.service.account.email";
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index d768c230ce..032f537bc3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -18,32 +18,20 @@
  */
 package org.apache.asterix.external.util.google.gcs;
 
-import static 
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
-import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
-import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
 import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
-import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
-import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY;
-import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
-import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
-import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
-import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT;
-import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_GCS_PROTOCOL;
-import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME;
+import static 
org.apache.asterix.external.util.google.gcs.GCSAuthUtils.buildClient;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -51,25 +39,15 @@ import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.json.JsonReadFeature;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.gax.paging.Page;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.BaseServiceException;
-import com.google.cloud.NoCredentials;
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
 
 public class GCSUtils {
     private GCSUtils() {
@@ -77,73 +55,17 @@ public class GCSUtils {
 
     }
 
-    private static final ObjectMapper JSON_CREDENTIALS_OBJECT_MAPPER = new 
ObjectMapper();
-    static {
-        
JSON_CREDENTIALS_OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
 true);
-    }
-
-    /**
-     * Builds the client using the provided configuration
-     *
-     * @param configuration properties
-     * @return 
clientasterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
-     * @throws CompilationException CompilationException
-     */
-    public static Storage buildClient(Map<String, String> configuration) 
throws CompilationException {
-        String applicationDefaultCredentials = 
configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
-        String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
-        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
-
-        StorageOptions.Builder builder = StorageOptions.newBuilder();
-        
builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY);
-
-        // default credentials provider
-        if (applicationDefaultCredentials != null) {
-            // only "true" value is allowed
-            if (!applicationDefaultCredentials.equalsIgnoreCase("true")) {
-                throw new 
CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE,
-                        APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME, "true");
-            }
-
-            // no other authentication parameters are allowed
-            if (jsonCredentials != null) {
-                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, 
JSON_CREDENTIALS_FIELD_NAME,
-                        APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
-            }
-
-            try {
-                
builder.setCredentials(GoogleCredentials.getApplicationDefault());
-            } catch (Exception ex) {
-                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
-            }
-        } else if (jsonCredentials != null) {
-            try (InputStream credentialsStream = new 
ByteArrayInputStream(jsonCredentials.getBytes())) {
-                
builder.setCredentials(GoogleCredentials.fromStream(credentialsStream));
-            } catch (IOException ex) {
-                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
-            } catch (Exception ex) {
-                throw new CompilationException(EXTERNAL_SOURCE_ERROR,
-                        "Encountered an issue while processing the JSON 
credentials. Please ensure the provided credentials are valid.");
-            }
-        } else {
-            builder.setCredentials(NoCredentials.getInstance());
-        }
-
-        if (endpoint != null) {
-            builder.setHost(endpoint);
-        }
-
-        return builder.build().getService();
-    }
-
     /**
      * Validate external dataset properties
      *
+     * @param appCtx application context
      * @param configuration properties
+     * @param srcLoc source location
+     * @param collector warning collector
      * @throws CompilationException Compilation exception
      */
-    public static void validateProperties(Map<String, String> configuration, 
SourceLocation srcLoc,
-            IWarningCollector collector) throws CompilationException {
+    public static void validateProperties(IApplicationContext appCtx, 
Map<String, String> configuration,
+            SourceLocation srcLoc, IWarningCollector collector) throws 
CompilationException {
         if (isDeltaTable(configuration)) {
             validateDeltaTableProperties(configuration);
         }
@@ -154,7 +76,6 @@ public class GCSUtils {
 
         validateIncludeExclude(configuration);
         try {
-            // TODO(htowaileb): maybe something better, this will check to 
ensure type is supported before creation
             new ExternalDataPrefix(configuration);
         } catch (AlgebricksException ex) {
             throw new 
CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
@@ -162,36 +83,33 @@ public class GCSUtils {
 
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
 
-        try {
+        try (Storage storage = buildClient(appCtx, configuration)) {
             Storage.BlobListOption limitOption = 
Storage.BlobListOption.pageSize(1);
             Storage.BlobListOption prefixOption = 
Storage.BlobListOption.prefix(getPrefix(configuration));
-            Storage storage = buildClient(configuration);
             Page<Blob> items = storage.list(container, limitOption, 
prefixOption);
 
             if (!items.iterateAll().iterator().hasNext() && 
collector.shouldWarn()) {
                 Warning warning = Warning.of(srcLoc, 
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
                 collector.warn(warning);
             }
-        } catch (CompilationException ex) {
-            throw ex;
         } catch (Exception ex) {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
         }
     }
 
-    public static List<Blob> listItems(Map<String, String> configuration, 
IncludeExcludeMatcher includeExcludeMatcher,
-            IWarningCollector warningCollector, ExternalDataPrefix 
externalDataPrefix,
-            IExternalFilterEvaluator evaluator) throws CompilationException, 
HyracksDataException {
+    public static List<Blob> listItems(IApplicationContext appCtx, Map<String, 
String> configuration,
+            IncludeExcludeMatcher includeExcludeMatcher, IWarningCollector 
warningCollector,
+            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator 
evaluator)
+            throws CompilationException, HyracksDataException {
         // Prepare to retrieve the objects
         List<Blob> filesOnly = new ArrayList<>();
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        Storage gcs = buildClient(configuration);
         Storage.BlobListOption options = 
Storage.BlobListOption.prefix(ExternalDataUtils.getPrefix(configuration));
         Page<Blob> items;
 
-        try {
+        try (Storage gcs = buildClient(appCtx, configuration)) {
             items = gcs.list(container, options);
-        } catch (BaseServiceException ex) {
+        } catch (Exception ex) {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
         }
 
@@ -224,61 +142,6 @@ public class GCSUtils {
         }
     }
 
-    public static void configureHdfsJobConf(JobConf conf, Map<String, String> 
configuration)
-            throws AlgebricksException {
-        configureHdfsJobConf(conf, configuration, 0);
-    }
-
-    /**
-     * Builds the client using the provided configuration
-     *
-     * @param configuration      properties
-     * @param numberOfPartitions number of partitions in the cluster
-     */
-    public static void configureHdfsJobConf(JobConf conf, Map<String, String> 
configuration, int numberOfPartitions)
-            throws AlgebricksException {
-        String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
-        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
-
-        // disable caching FileSystem
-        HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_GCS_PROTOCOL);
-
-        // TODO(htowaileb): needs further testing, recommended to disable by 
gcs-hadoop team
-        conf.set(GCSConstants.HADOOP_SUPPORT_COMPRESSED, 
ExternalDataConstants.FALSE);
-
-        // TODO(htowaileb): needs further testing
-        // set number of threads
-        //        conf.set(GCSConstants.HADOOP_MAX_REQUESTS_PER_BATCH, 
String.valueOf(numberOfPartitions));
-        //        conf.set(GCSConstants.HADOOP_BATCH_THREADS, 
String.valueOf(numberOfPartitions));
-
-        // authentication method
-        if (jsonCredentials == null) {
-            // anonymous access
-            conf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED);
-        } else {
-            try {
-                JsonNode jsonCreds = 
JSON_CREDENTIALS_OBJECT_MAPPER.readTree(jsonCredentials);
-                // Setting these values instead of 
HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported
-                // in com.google.cloud.bigdataoss:util-hadoop only up to 
version hadoop3-2.2.x and is removed in
-                // version 3.x.y, which also removed support for hadoop-2
-                conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID,
-                        
jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText());
-                conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY,
-                        
jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText());
-                conf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL,
-                        
jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText());
-            } catch (JsonProcessingException e) {
-                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, 
"Unable to parse Json Credentials",
-                        getMessageOrToString(e));
-            }
-        }
-
-        // set endpoint if provided, default is https://storage.googleapis.com/
-        if (endpoint != null) {
-            conf.set(HADOOP_ENDPOINT, endpoint);
-        }
-    }
-
     public static String getPath(Map<String, String> configuration) {
         return GCSConstants.HADOOP_GCS_PROTOCOL + "://"
                 + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'

Reply via email to