This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f0c442cba [ASTERIXDB-3503][EXT] Introduce Delta Lake Support for 
Google Cloud Storage (GCS)
6f0c442cba is described below

commit 6f0c442cba65b3c0a85a4b59b05462ff1e7a7b98
Author: ayush.tripathi <[email protected]>
AuthorDate: Thu Nov 28 16:13:02 2024 +0530

    [ASTERIXDB-3503][EXT] Introduce Delta Lake Support for Google Cloud Storage 
(GCS)
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Ext-ref: MB-64376
    
    Change-Id: I4cd44ba31a22cc124e346b077a1c2798ba9ab747
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19140
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Ali Alsuliman <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../reader/aws/delta/AwsS3DeltaReaderFactory.java  | 231 +--------------------
 .../reader/aws/delta/DeltaFileRecordReader.java    |  25 +--
 ...aReaderFactory.java => DeltaReaderFactory.java} |  54 ++---
 .../reader/gcs/delta/GCSDeltaReaderFactory.java    |  51 +++++
 .../asterix/external/util/ExternalDataUtils.java   |  21 +-
 .../asterix/external/util/aws/s3/S3AuthUtils.java  |  14 +-
 .../asterix/external/util/aws/s3/S3Utils.java      |   6 +
 .../asterix/external/util/google/gcs/GCSUtils.java |  19 +-
 ...pache.asterix.external.api.IRecordReaderFactory |   3 +-
 9 files changed, 132 insertions(+), 292 deletions(-)

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index d29cd40b15..ba0d0f4c2c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -18,245 +18,34 @@
  */
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import static 
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
 
-import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.api.IExternalDataRuntimeContext;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import 
org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.projection.FunctionCallInformation;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import io.delta.kernel.Scan;
-import io.delta.kernel.Snapshot;
-import io.delta.kernel.data.FilteredColumnarBatch;
-import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.engine.DefaultEngine;
-import io.delta.kernel.engine.Engine;
-import io.delta.kernel.exceptions.KernelException;
-import io.delta.kernel.internal.InternalScanFileUtils;
-import io.delta.kernel.types.StructType;
-import io.delta.kernel.utils.CloseableIterator;
-import io.delta.kernel.utils.FileStatus;
-
-public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
+import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.hadoop.mapred.JobConf;
 
+public class AwsS3DeltaReaderFactory extends DeltaReaderFactory {
     private static final long serialVersionUID = 1L;
-    private static final List<String> recordReaderNames =
+    private static final List<String> RECORD_READER_NAMES =
             
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
-    private static final Logger LOGGER = LogManager.getLogger();
-    private transient AlgebricksAbsolutePartitionConstraint 
locationConstraints;
-    private String scanState;
-    private Map<String, String> configuration;
-    protected final List<PartitionWorkLoadBasedOnSize> 
partitionWorkLoadsBasedOnSize = new ArrayList<>();
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        return locationConstraints;
-    }
-
-    @Override
-    public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
-            IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
-            throws AlgebricksException, HyracksDataException {
-        this.configuration = configuration;
-        Configuration conf = new Configuration();
-        applyConfiguration(configuration, conf);
-        String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
-                + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
-                + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
-
-        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
-
-        Engine engine = DefaultEngine.create(conf);
-        io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
-        Snapshot snapshot;
-        try {
-            snapshot = table.getLatestSnapshot(engine);
-        } catch (KernelException e) {
-            LOGGER.info("Failed to get latest snapshot for table: {}", 
tableMetadataPath, e);
-            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
e, getMessageOrToString(e));
-        }
-
-        List<Warning> warnings = new ArrayList<>();
-        DeltaConverterContext converterContext = new 
DeltaConverterContext(configuration, warnings);
-        AsterixTypeToDeltaTypeVisitor visitor = new 
AsterixTypeToDeltaTypeVisitor(converterContext);
-        StructType requiredSchema;
-        try {
-            ARecordType expectedType = HDFSUtils.getExpectedType(conf);
-            Map<String, FunctionCallInformation> functionCallInformationMap =
-                    HDFSUtils.getFunctionCallInformationMap(conf);
-            StructType fileSchema = snapshot.getSchema(engine);
-            requiredSchema = visitor.clipType(expectedType, fileSchema, 
functionCallInformationMap);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } catch (AsterixDeltaRuntimeException e) {
-            throw e.getHyracksDataException();
-        }
-        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
-        scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
-        CloseableIterator<FilteredColumnarBatch> iter = 
scan.getScanFiles(engine);
-
-        List<Row> scanFiles = new ArrayList<>();
-        while (iter.hasNext()) {
-            FilteredColumnarBatch batch = iter.next();
-            CloseableIterator<Row> rowIter = batch.getRows();
-            while (rowIter.hasNext()) {
-                Row row = rowIter.next();
-                scanFiles.add(row);
-            }
-        }
-        locationConstraints = configureLocationConstraints(appCtx, scanFiles);
-        configuration.put(ExternalDataConstants.KEY_PARSER, 
ExternalDataConstants.FORMAT_DELTA);
-        distributeFiles(scanFiles);
-        issueWarnings(warnings, warningCollector);
-    }
-
-    private void issueWarnings(List<Warning> warnings, IWarningCollector 
warningCollector) {
-        if (!warnings.isEmpty()) {
-            for (Warning warning : warnings) {
-                if (warningCollector.shouldWarn()) {
-                    warningCollector.warn(warning);
-                }
-            }
-        }
-        warnings.clear();
-    }
-
-    private AlgebricksAbsolutePartitionConstraint 
configureLocationConstraints(ICcApplicationContext appCtx,
-            List<Row> scanFiles) {
-        IClusterStateManager csm = appCtx.getClusterStateManager();
-
-        String[] locations = csm.getClusterLocations().getLocations();
-        if (scanFiles.size() == 0) {
-            return 
AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
-        } else if (locations.length > scanFiles.size()) {
-            LOGGER.debug(
-                    "analytics partitions ({}) exceeds total partition count 
({}); limiting ingestion partitions to total partition count",
-                    locations.length, scanFiles.size());
-            final String[] locationCopy = locations.clone();
-            ArrayUtils.shuffle(locationCopy);
-            locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
-        }
-        return new AlgebricksAbsolutePartitionConstraint(locations);
-    }
-
-    private void distributeFiles(List<Row> scanFiles) {
-        final int partitionsCount = 
getPartitionConstraint().getLocations().length;
-        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new 
PriorityQueue<>(partitionsCount,
-                
Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
-
-        // Prepare the workloads based on the number of partitions
-        for (int i = 0; i < partitionsCount; i++) {
-            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
-        }
-        for (Row scanFileRow : scanFiles) {
-            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
-            FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
-            workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), 
fileStatus.getSize());
-            workloadQueue.add(workload);
-        }
-        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
-    }
-
-    public static void applyConfiguration(Map<String, String> configuration, 
Configuration conf) {
-        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            conf.set(S3Constants.HADOOP_SESSION_TOKEN, 
configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        conf.set(S3Constants.HADOOP_REGION, 
configuration.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
-        conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
-                
configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
-        
conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
-                
configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
 ""));
-    }
 
     @Override
-    public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext 
context) throws HyracksDataException {
-        try {
-            int partition = context.getPartition();
-            return new 
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(),
 scanState,
-                    configuration, context);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
+    protected void configureJobConf(JobConf conf, Map<String, String> 
configuration) {
+        configureAwsS3HdfsJobConf(conf, configuration);
     }
 
     @Override
-    public Class<?> getRecordClass() throws AsterixException {
-        return Row.class;
+    protected String getTablePath(Map<String, String> configuration) {
+        return S3Utils.getPath(configuration);
     }
 
     @Override
     public List<String> getRecordReaderNames() {
-        return recordReaderNames;
-    }
-
-    @Override
-    public Set<String> getReaderSupportedFormats() {
-        return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
-    }
-
-    public static class PartitionWorkLoadBasedOnSize implements Serializable {
-        private static final long serialVersionUID = 1L;
-        private final List<String> scanFiles = new ArrayList<>();
-        private long totalSize = 0;
-
-        public PartitionWorkLoadBasedOnSize() {
-        }
-
-        public List<String> getScanFiles() {
-            return scanFiles;
-        }
-
-        public void addScanFile(String scanFile, long size) {
-            this.scanFiles.add(scanFile);
-            this.totalSize += size;
-        }
-
-        public long getTotalSize() {
-            return totalSize;
-        }
-
-        @Override
-        public String toString() {
-            return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
-        }
+        return RECORD_READER_NAMES;
     }
 
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index a5b21b6cad..a094c221e5 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -19,24 +19,21 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
 import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
-import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.IFeedLogManager;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 
 import io.delta.kernel.Scan;
 import io.delta.kernel.data.ColumnarBatch;
@@ -69,20 +66,10 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
     private Row scanFile;
     private CloseableIterator<Row> rows;
 
-    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, Map<String, String> conf,
-            IExternalDataRuntimeContext context) {
-        Configuration config = new Configuration();
-        config.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
conf.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        config.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
conf.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            config.set(S3Constants.HADOOP_SESSION_TOKEN, 
conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        config.set(S3Constants.HADOOP_REGION, 
conf.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = conf.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            config.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
-        this.engine = DefaultEngine.create(config);
+    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, ConfFactory config)
+            throws HyracksDataException {
+        JobConf conf = config.getConf();
+        this.engine = DefaultEngine.create(conf);
         this.scanFiles = new ArrayList<>();
         for (String scanFile : serScanFiles) {
             this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
similarity index 80%
copy from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
copy to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index d29cd40b15..dc4c310660 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
@@ -43,17 +42,17 @@ import org.apache.asterix.external.api.IRecordReaderFactory;
 import 
org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -69,35 +68,34 @@ import io.delta.kernel.types.StructType;
 import io.delta.kernel.utils.CloseableIterator;
 import io.delta.kernel.utils.FileStatus;
 
-public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
+public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object> {
 
     private static final long serialVersionUID = 1L;
-    private static final List<String> recordReaderNames =
-            
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
     private static final Logger LOGGER = LogManager.getLogger();
     private transient AlgebricksAbsolutePartitionConstraint 
locationConstraints;
     private String scanState;
-    private Map<String, String> configuration;
     protected final List<PartitionWorkLoadBasedOnSize> 
partitionWorkLoadsBasedOnSize = new ArrayList<>();
+    protected ConfFactory confFactory;
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
         return locationConstraints;
     }
 
+    protected abstract void configureJobConf(JobConf conf, Map<String, String> 
configuration)
+            throws AlgebricksException;
+
+    protected abstract String getTablePath(Map<String, String> configuration) 
throws AlgebricksException;
+
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
             IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
             throws AlgebricksException, HyracksDataException {
-        this.configuration = configuration;
-        Configuration conf = new Configuration();
-        applyConfiguration(configuration, conf);
-        String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
-                + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
-                + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
-
+        JobConf conf = new JobConf();
         ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
-
+        configureJobConf(conf, configuration);
+        confFactory = new ConfFactory(conf);
+        String tableMetadataPath = getTablePath(configuration);
         Engine engine = DefaultEngine.create(conf);
         io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
         Snapshot snapshot;
@@ -162,7 +160,7 @@ public class AwsS3DeltaReaderFactory implements 
IRecordReaderFactory<Object> {
             return 
AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
         } else if (locations.length > scanFiles.size()) {
             LOGGER.debug(
-                    "analytics partitions ({}) exceeds total partition count 
({}); limiting ingestion partitions to total partition count",
+                    "configured partitions ({}) exceeds total partition count 
({}); limiting configured partitions to total partition count",
                     locations.length, scanFiles.size());
             final String[] locationCopy = locations.clone();
             ArrayUtils.shuffle(locationCopy);
@@ -189,29 +187,12 @@ public class AwsS3DeltaReaderFactory implements 
IRecordReaderFactory<Object> {
         partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
     }
 
-    public static void applyConfiguration(Map<String, String> configuration, 
Configuration conf) {
-        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            conf.set(S3Constants.HADOOP_SESSION_TOKEN, 
configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        conf.set(S3Constants.HADOOP_REGION, 
configuration.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
-        conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
-                
configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
-        
conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
-                
configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
 ""));
-    }
-
     @Override
     public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext 
context) throws HyracksDataException {
         try {
             int partition = context.getPartition();
             return new 
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(),
 scanState,
-                    configuration, context);
+                    confFactory);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -222,11 +203,6 @@ public class AwsS3DeltaReaderFactory implements 
IRecordReaderFactory<Object> {
         return Row.class;
     }
 
-    @Override
-    public List<String> getRecordReaderNames() {
-        return recordReaderNames;
-    }
-
     @Override
     public Set<String> getReaderSupportedFormats() {
         return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
new file mode 100644
index 0000000000..ee88569d52
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.gcs.delta;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import 
org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public class GCSDeltaReaderFactory extends DeltaReaderFactory {
+    private static final long serialVersionUID = 1L;
+    private static final List<String> RECORD_READER_NAMES =
+            
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
+
+    @Override
+    protected void configureJobConf(JobConf conf, Map<String, String> 
configuration) throws AlgebricksException {
+        GCSUtils.configureHdfsJobConf(conf, configuration);
+    }
+
+    @Override
+    protected String getTablePath(Map<String, String> configuration) throws 
AlgebricksException {
+        return GCSUtils.getPath(configuration);
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return RECORD_READER_NAMES;
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index c7deb7c762..6767f933b0 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -29,8 +29,10 @@ import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_PATH;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
+import static 
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
 import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
 import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
+import static 
org.apache.asterix.external.util.google.gcs.GCSUtils.configureHdfsJobConf;
 import static 
org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
@@ -72,14 +74,15 @@ import 
org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
-import 
org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory;
 import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
 import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.external.util.azure.blob_storage.AzureConstants;
 import org.apache.asterix.external.util.google.gcs.GCSConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AUnionType;
@@ -90,6 +93,7 @@ import 
org.apache.asterix.runtime.evaluators.common.NumberUtils;
 import 
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -515,21 +519,22 @@ public class ExternalDataUtils {
         }
     }
 
-    public static void validateDeltaTableExists(Map<String, String> 
configuration) throws CompilationException {
-        Configuration conf = new Configuration();
+    public static void validateDeltaTableExists(Map<String, String> 
configuration) throws AlgebricksException {
         String tableMetadataPath = null;
+        JobConf conf = new JobConf();
         if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
                 .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
-            AwsS3DeltaReaderFactory.applyConfiguration(configuration, conf);
-            tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
-                    + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
-                    + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+            configureAwsS3HdfsJobConf(conf, configuration);
+            tableMetadataPath = S3Utils.getPath(configuration);
+        } else if 
(configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+                .equals(ExternalDataConstants.KEY_ADAPTER_NAME_GCS)) {
+            configureHdfsJobConf(conf, configuration);
+            tableMetadataPath = GCSUtils.getPath(configuration);
         } else {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
                     "Delta format is not supported for the external source 
type: "
                             + 
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE));
         }
-
         Engine engine = DefaultEngine.create(conf);
         io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
         try {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index f36d25dd5f..45988e8ef3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -333,6 +333,10 @@ public class S3AuthUtils {
      * @param configuration      properties
      * @param numberOfPartitions number of partitions in the cluster
      */
+    public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, 
String> configuration) {
+        configureAwsS3HdfsJobConf(conf, configuration, 0);
+    }
+
     public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, 
String> configuration,
             int numberOfPartitions) {
         String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
@@ -371,7 +375,9 @@ public class S3AuthUtils {
         /*
          * Set the size of S3 connection pool to be the number of partitions
          */
-        conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, 
String.valueOf(numberOfPartitions));
+        if (numberOfPartitions != 0) {
+            conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, 
String.valueOf(numberOfPartitions));
+        }
 
         if (serviceEndpoint != null) {
             // Validation of the URL should be done at hadoop-aws level
@@ -470,7 +476,11 @@ public class S3AuthUtils {
             throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
         }
         if (isDeltaTable(configuration)) {
-            validateDeltaTableExists(configuration);
+            try {
+                validateDeltaTableExists(configuration);
+            } catch (AlgebricksException e) {
+                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e);
+            }
         }
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index a2b50e1fc0..d8dd478da7 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -292,4 +292,10 @@ public class S3Utils {
         allObjects.put("folders", folders);
         return allObjects;
     }
+
+    public static String getPath(Map<String, String> configuration) {
+        return S3Constants.HADOOP_S3_PROTOCOL + "://"
+                + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 74a664da35..bfd35fcec3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -22,6 +22,8 @@ import static 
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERR
 import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
 import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
 import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
@@ -140,9 +142,11 @@ public class GCSUtils {
      */
     public static void validateProperties(Map<String, String> configuration, 
SourceLocation srcLoc,
             IWarningCollector collector) throws CompilationException {
-
+        if (isDeltaTable(configuration)) {
+            validateDeltaTableProperties(configuration);
+        }
         // check if the format property is present
-        if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+        else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
         }
 
@@ -224,6 +228,11 @@ public class GCSUtils {
      * @param configuration      properties
      * @param numberOfPartitions number of partitions in the cluster
      */
+    public static void configureHdfsJobConf(JobConf conf, Map<String, String> 
configuration)
+            throws AlgebricksException {
+        configureHdfsJobConf(conf, configuration, 0);
+    }
+
     public static void configureHdfsJobConf(JobConf conf, Map<String, String> 
configuration, int numberOfPartitions)
             throws AlgebricksException {
         String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
@@ -267,4 +276,10 @@ public class GCSUtils {
             conf.set(HADOOP_ENDPOINT, endpoint);
         }
     }
+
+    public static String getPath(Map<String, String> configuration) {
+        return GCSConstants.HADOOP_GCS_PROTOCOL + "://"
+                + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 2c15b5a844..1f25c4baf2 100644
--- 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -28,4 +28,5 @@ 
org.apache.asterix.external.input.record.reader.azure.blob.AzureBlobReaderFactor
 
org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
 
org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
 
org.apache.asterix.external.input.record.reader.azure.parquet.AzureDataLakeParquetReaderFactory
-org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory
\ No newline at end of file
+org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory
+org.apache.asterix.external.input.record.reader.gcs.delta.GCSDeltaReaderFactory
\ No newline at end of file


Reply via email to