This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6f0c442cba [ASTERIXDB-3503][EXT] Introduce Delta Lake Support for
Google Cloud Storage (GCS)
6f0c442cba is described below
commit 6f0c442cba65b3c0a85a4b59b05462ff1e7a7b98
Author: ayush.tripathi <[email protected]>
AuthorDate: Thu Nov 28 16:13:02 2024 +0530
[ASTERIXDB-3503][EXT] Introduce Delta Lake Support for Google Cloud Storage
(GCS)
- user model changes: no
- storage format changes: no
- interface changes: yes
Ext-ref: MB-64376
Change-Id: I4cd44ba31a22cc124e346b077a1c2798ba9ab747
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19140
Integration-Tests: Jenkins <[email protected]>
Tested-by: Ali Alsuliman <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../reader/aws/delta/AwsS3DeltaReaderFactory.java | 231 +--------------------
.../reader/aws/delta/DeltaFileRecordReader.java | 25 +--
...aReaderFactory.java => DeltaReaderFactory.java} | 54 ++---
.../reader/gcs/delta/GCSDeltaReaderFactory.java | 51 +++++
.../asterix/external/util/ExternalDataUtils.java | 21 +-
.../asterix/external/util/aws/s3/S3AuthUtils.java | 14 +-
.../asterix/external/util/aws/s3/S3Utils.java | 6 +
.../asterix/external/util/google/gcs/GCSUtils.java | 19 +-
...pache.asterix.external.api.IRecordReaderFactory | 3 +-
9 files changed, 132 insertions(+), 292 deletions(-)
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index d29cd40b15..ba0d0f4c2c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -18,245 +18,34 @@
*/
package org.apache.asterix.external.input.record.reader.aws.delta;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import static
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.api.IExternalDataRuntimeContext;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import
org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.projection.FunctionCallInformation;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import io.delta.kernel.Scan;
-import io.delta.kernel.Snapshot;
-import io.delta.kernel.data.FilteredColumnarBatch;
-import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.engine.DefaultEngine;
-import io.delta.kernel.engine.Engine;
-import io.delta.kernel.exceptions.KernelException;
-import io.delta.kernel.internal.InternalScanFileUtils;
-import io.delta.kernel.types.StructType;
-import io.delta.kernel.utils.CloseableIterator;
-import io.delta.kernel.utils.FileStatus;
-
-public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
+import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.hadoop.mapred.JobConf;
+public class AwsS3DeltaReaderFactory extends DeltaReaderFactory {
private static final long serialVersionUID = 1L;
- private static final List<String> recordReaderNames =
+ private static final List<String> RECORD_READER_NAMES =
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
- private static final Logger LOGGER = LogManager.getLogger();
- private transient AlgebricksAbsolutePartitionConstraint
locationConstraints;
- private String scanState;
- private Map<String, String> configuration;
- protected final List<PartitionWorkLoadBasedOnSize>
partitionWorkLoadsBasedOnSize = new ArrayList<>();
-
- @Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- return locationConstraints;
- }
-
- @Override
- public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
- throws AlgebricksException, HyracksDataException {
- this.configuration = configuration;
- Configuration conf = new Configuration();
- applyConfiguration(configuration, conf);
- String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
- +
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
- +
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
-
- ICcApplicationContext appCtx = (ICcApplicationContext)
serviceCtx.getApplicationContext();
-
- Engine engine = DefaultEngine.create(conf);
- io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine,
tableMetadataPath);
- Snapshot snapshot;
- try {
- snapshot = table.getLatestSnapshot(engine);
- } catch (KernelException e) {
- LOGGER.info("Failed to get latest snapshot for table: {}",
tableMetadataPath, e);
- throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
e, getMessageOrToString(e));
- }
-
- List<Warning> warnings = new ArrayList<>();
- DeltaConverterContext converterContext = new
DeltaConverterContext(configuration, warnings);
- AsterixTypeToDeltaTypeVisitor visitor = new
AsterixTypeToDeltaTypeVisitor(converterContext);
- StructType requiredSchema;
- try {
- ARecordType expectedType = HDFSUtils.getExpectedType(conf);
- Map<String, FunctionCallInformation> functionCallInformationMap =
- HDFSUtils.getFunctionCallInformationMap(conf);
- StructType fileSchema = snapshot.getSchema(engine);
- requiredSchema = visitor.clipType(expectedType, fileSchema,
functionCallInformationMap);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (AsterixDeltaRuntimeException e) {
- throw e.getHyracksDataException();
- }
- Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine,
requiredSchema).build();
- scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
- CloseableIterator<FilteredColumnarBatch> iter =
scan.getScanFiles(engine);
-
- List<Row> scanFiles = new ArrayList<>();
- while (iter.hasNext()) {
- FilteredColumnarBatch batch = iter.next();
- CloseableIterator<Row> rowIter = batch.getRows();
- while (rowIter.hasNext()) {
- Row row = rowIter.next();
- scanFiles.add(row);
- }
- }
- locationConstraints = configureLocationConstraints(appCtx, scanFiles);
- configuration.put(ExternalDataConstants.KEY_PARSER,
ExternalDataConstants.FORMAT_DELTA);
- distributeFiles(scanFiles);
- issueWarnings(warnings, warningCollector);
- }
-
- private void issueWarnings(List<Warning> warnings, IWarningCollector
warningCollector) {
- if (!warnings.isEmpty()) {
- for (Warning warning : warnings) {
- if (warningCollector.shouldWarn()) {
- warningCollector.warn(warning);
- }
- }
- }
- warnings.clear();
- }
-
- private AlgebricksAbsolutePartitionConstraint
configureLocationConstraints(ICcApplicationContext appCtx,
- List<Row> scanFiles) {
- IClusterStateManager csm = appCtx.getClusterStateManager();
-
- String[] locations = csm.getClusterLocations().getLocations();
- if (scanFiles.size() == 0) {
- return
AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
- } else if (locations.length > scanFiles.size()) {
- LOGGER.debug(
- "analytics partitions ({}) exceeds total partition count
({}); limiting ingestion partitions to total partition count",
- locations.length, scanFiles.size());
- final String[] locationCopy = locations.clone();
- ArrayUtils.shuffle(locationCopy);
- locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
- }
- return new AlgebricksAbsolutePartitionConstraint(locations);
- }
-
- private void distributeFiles(List<Row> scanFiles) {
- final int partitionsCount =
getPartitionConstraint().getLocations().length;
- PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new
PriorityQueue<>(partitionsCount,
-
Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
-
- // Prepare the workloads based on the number of partitions
- for (int i = 0; i < partitionsCount; i++) {
- workloadQueue.add(new PartitionWorkLoadBasedOnSize());
- }
- for (Row scanFileRow : scanFiles) {
- PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
- FileStatus fileStatus =
InternalScanFileUtils.getAddFileStatus(scanFileRow);
- workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow),
fileStatus.getSize());
- workloadQueue.add(workload);
- }
- partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
- }
-
- public static void applyConfiguration(Map<String, String> configuration,
Configuration conf) {
- conf.set(S3Constants.HADOOP_ACCESS_KEY_ID,
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
- conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
- if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
- conf.set(S3Constants.HADOOP_SESSION_TOKEN,
configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
- }
- conf.set(S3Constants.HADOOP_REGION,
configuration.get(S3Constants.REGION_FIELD_NAME));
- String serviceEndpoint =
configuration.get(SERVICE_END_POINT_FIELD_NAME);
- if (serviceEndpoint != null) {
- conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
- }
- conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
-
configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
-
conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
-
configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
""));
- }
@Override
- public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext
context) throws HyracksDataException {
- try {
- int partition = context.getPartition();
- return new
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(),
scanState,
- configuration, context);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
+ protected void configureJobConf(JobConf conf, Map<String, String>
configuration) {
+ configureAwsS3HdfsJobConf(conf, configuration);
}
@Override
- public Class<?> getRecordClass() throws AsterixException {
- return Row.class;
+ protected String getTablePath(Map<String, String> configuration) {
+ return S3Utils.getPath(configuration);
}
@Override
public List<String> getRecordReaderNames() {
- return recordReaderNames;
- }
-
- @Override
- public Set<String> getReaderSupportedFormats() {
- return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
- }
-
- public static class PartitionWorkLoadBasedOnSize implements Serializable {
- private static final long serialVersionUID = 1L;
- private final List<String> scanFiles = new ArrayList<>();
- private long totalSize = 0;
-
- public PartitionWorkLoadBasedOnSize() {
- }
-
- public List<String> getScanFiles() {
- return scanFiles;
- }
-
- public void addScanFile(String scanFile, long size) {
- this.scanFiles.add(scanFile);
- this.totalSize += size;
- }
-
- public long getTotalSize() {
- return totalSize;
- }
-
- @Override
- public String toString() {
- return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
- }
+ return RECORD_READER_NAMES;
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index a5b21b6cad..a094c221e5 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -19,24 +19,21 @@
package org.apache.asterix.external.input.record.reader.aws.delta;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.IFeedLogManager;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
import io.delta.kernel.Scan;
import io.delta.kernel.data.ColumnarBatch;
@@ -69,20 +66,10 @@ public class DeltaFileRecordReader implements
IRecordReader<Row> {
private Row scanFile;
private CloseableIterator<Row> rows;
- public DeltaFileRecordReader(List<String> serScanFiles, String
serScanState, Map<String, String> conf,
- IExternalDataRuntimeContext context) {
- Configuration config = new Configuration();
- config.set(S3Constants.HADOOP_ACCESS_KEY_ID,
conf.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
- config.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
conf.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
- if (conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
- config.set(S3Constants.HADOOP_SESSION_TOKEN,
conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
- }
- config.set(S3Constants.HADOOP_REGION,
conf.get(S3Constants.REGION_FIELD_NAME));
- String serviceEndpoint = conf.get(SERVICE_END_POINT_FIELD_NAME);
- if (serviceEndpoint != null) {
- config.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
- }
- this.engine = DefaultEngine.create(config);
+ public DeltaFileRecordReader(List<String> serScanFiles, String
serScanState, ConfFactory config)
+ throws HyracksDataException {
+ JobConf conf = config.getConf();
+ this.engine = DefaultEngine.create(conf);
this.scanFiles = new ArrayList<>();
for (String scanFile : serScanFiles) {
this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
similarity index 80%
copy from
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
copy to
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index d29cd40b15..dc4c310660 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.input.record.reader.aws.delta;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.IOException;
@@ -43,17 +42,17 @@ import org.apache.asterix.external.api.IRecordReaderFactory;
import
org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -69,35 +68,34 @@ import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
-public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
+public abstract class DeltaReaderFactory implements
IRecordReaderFactory<Object> {
private static final long serialVersionUID = 1L;
- private static final List<String> recordReaderNames =
-
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
private static final Logger LOGGER = LogManager.getLogger();
private transient AlgebricksAbsolutePartitionConstraint
locationConstraints;
private String scanState;
- private Map<String, String> configuration;
protected final List<PartitionWorkLoadBasedOnSize>
partitionWorkLoadsBasedOnSize = new ArrayList<>();
+ protected ConfFactory confFactory;
@Override
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
return locationConstraints;
}
+ protected abstract void configureJobConf(JobConf conf, Map<String, String>
configuration)
+ throws AlgebricksException;
+
+ protected abstract String getTablePath(Map<String, String> configuration)
throws AlgebricksException;
+
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AlgebricksException, HyracksDataException {
- this.configuration = configuration;
- Configuration conf = new Configuration();
- applyConfiguration(configuration, conf);
- String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
- +
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
- +
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
-
+ JobConf conf = new JobConf();
ICcApplicationContext appCtx = (ICcApplicationContext)
serviceCtx.getApplicationContext();
-
+ configureJobConf(conf, configuration);
+ confFactory = new ConfFactory(conf);
+ String tableMetadataPath = getTablePath(configuration);
Engine engine = DefaultEngine.create(conf);
io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine,
tableMetadataPath);
Snapshot snapshot;
@@ -162,7 +160,7 @@ public class AwsS3DeltaReaderFactory implements
IRecordReaderFactory<Object> {
return
AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
} else if (locations.length > scanFiles.size()) {
LOGGER.debug(
- "analytics partitions ({}) exceeds total partition count
({}); limiting ingestion partitions to total partition count",
+ "configured partitions ({}) exceeds total partition count
({}); limiting configured partitions to total partition count",
locations.length, scanFiles.size());
final String[] locationCopy = locations.clone();
ArrayUtils.shuffle(locationCopy);
@@ -189,29 +187,12 @@ public class AwsS3DeltaReaderFactory implements
IRecordReaderFactory<Object> {
partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
}
- public static void applyConfiguration(Map<String, String> configuration,
Configuration conf) {
- conf.set(S3Constants.HADOOP_ACCESS_KEY_ID,
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
- conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
- if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
- conf.set(S3Constants.HADOOP_SESSION_TOKEN,
configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
- }
- conf.set(S3Constants.HADOOP_REGION,
configuration.get(S3Constants.REGION_FIELD_NAME));
- String serviceEndpoint =
configuration.get(SERVICE_END_POINT_FIELD_NAME);
- if (serviceEndpoint != null) {
- conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
- }
- conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
-
configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
-
conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
-
configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
""));
- }
-
@Override
public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext
context) throws HyracksDataException {
try {
int partition = context.getPartition();
return new
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(),
scanState,
- configuration, context);
+ confFactory);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -222,11 +203,6 @@ public class AwsS3DeltaReaderFactory implements
IRecordReaderFactory<Object> {
return Row.class;
}
- @Override
- public List<String> getRecordReaderNames() {
- return recordReaderNames;
- }
-
@Override
public Set<String> getReaderSupportedFormats() {
return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
new file mode 100644
index 0000000000..ee88569d52
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.gcs.delta;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import
org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public class GCSDeltaReaderFactory extends DeltaReaderFactory {
+ private static final long serialVersionUID = 1L;
+ private static final List<String> RECORD_READER_NAMES =
+
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
+
+ @Override
+ protected void configureJobConf(JobConf conf, Map<String, String>
configuration) throws AlgebricksException {
+ GCSUtils.configureHdfsJobConf(conf, configuration);
+ }
+
+ @Override
+ protected String getTablePath(Map<String, String> configuration) throws
AlgebricksException {
+ return GCSUtils.getPath(configuration);
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return RECORD_READER_NAMES;
+ }
+
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index c7deb7c762..6767f933b0 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -29,8 +29,10 @@ import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_PATH;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
+import static
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
import static
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
import static
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
+import static
org.apache.asterix.external.util.google.gcs.GCSUtils.configureHdfsJobConf;
import static
org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
@@ -72,14 +74,15 @@ import
org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReaderFactory;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
-import
org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory;
import org.apache.asterix.external.library.JavaLibrary;
import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.external.util.azure.blob_storage.AzureConstants;
import org.apache.asterix.external.util.google.gcs.GCSConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
@@ -90,6 +93,7 @@ import
org.apache.asterix.runtime.evaluators.common.NumberUtils;
import
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -515,21 +519,22 @@ public class ExternalDataUtils {
}
}
- public static void validateDeltaTableExists(Map<String, String>
configuration) throws CompilationException {
- Configuration conf = new Configuration();
+ public static void validateDeltaTableExists(Map<String, String>
configuration) throws AlgebricksException {
String tableMetadataPath = null;
+ JobConf conf = new JobConf();
if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
.equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
- AwsS3DeltaReaderFactory.applyConfiguration(configuration, conf);
- tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
- +
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
- +
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+ configureAwsS3HdfsJobConf(conf, configuration);
+ tableMetadataPath = S3Utils.getPath(configuration);
+ } else if
(configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+ .equals(ExternalDataConstants.KEY_ADAPTER_NAME_GCS)) {
+ configureHdfsJobConf(conf, configuration);
+ tableMetadataPath = GCSUtils.getPath(configuration);
} else {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
"Delta format is not supported for the external source
type: "
+
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE));
}
-
Engine engine = DefaultEngine.create(conf);
io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine,
tableMetadataPath);
try {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index f36d25dd5f..45988e8ef3 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -333,6 +333,10 @@ public class S3AuthUtils {
* @param configuration properties
* @param numberOfPartitions number of partitions in the cluster
*/
+ public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String,
String> configuration) {
+ configureAwsS3HdfsJobConf(conf, configuration, 0);
+ }
+
public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String,
String> configuration,
int numberOfPartitions) {
String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
@@ -371,7 +375,9 @@ public class S3AuthUtils {
/*
* Set the size of S3 connection pool to be the number of partitions
*/
- conf.set(HADOOP_S3_CONNECTION_POOL_SIZE,
String.valueOf(numberOfPartitions));
+ if (numberOfPartitions != 0) {
+ conf.set(HADOOP_S3_CONNECTION_POOL_SIZE,
String.valueOf(numberOfPartitions));
+ }
if (serviceEndpoint != null) {
// Validation of the URL should be done at hadoop-aws level
@@ -470,7 +476,11 @@ public class S3AuthUtils {
throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
}
if (isDeltaTable(configuration)) {
- validateDeltaTableExists(configuration);
+ try {
+ validateDeltaTableExists(configuration);
+ } catch (AlgebricksException e) {
+ throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e);
+ }
}
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index a2b50e1fc0..d8dd478da7 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -292,4 +292,10 @@ public class S3Utils {
allObjects.put("folders", folders);
return allObjects;
}
+
+ public static String getPath(Map<String, String> configuration) {
+ return S3Constants.HADOOP_S3_PROTOCOL + "://"
+ +
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ +
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 74a664da35..bfd35fcec3 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -22,6 +22,8 @@ import static
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERR
import static
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
import static
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
import static
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static
org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
import static
org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
@@ -140,9 +142,11 @@ public class GCSUtils {
*/
public static void validateProperties(Map<String, String> configuration,
SourceLocation srcLoc,
IWarningCollector collector) throws CompilationException {
-
+ if (isDeltaTable(configuration)) {
+ validateDeltaTableProperties(configuration);
+ }
// check if the format property is present
- if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
srcLoc, ExternalDataConstants.KEY_FORMAT);
}
@@ -224,6 +228,11 @@ public class GCSUtils {
* @param configuration properties
* @param numberOfPartitions number of partitions in the cluster
*/
+ public static void configureHdfsJobConf(JobConf conf, Map<String, String>
configuration)
+ throws AlgebricksException {
+ configureHdfsJobConf(conf, configuration, 0);
+ }
+
public static void configureHdfsJobConf(JobConf conf, Map<String, String>
configuration, int numberOfPartitions)
throws AlgebricksException {
String jsonCredentials =
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
@@ -267,4 +276,10 @@ public class GCSUtils {
conf.set(HADOOP_ENDPOINT, endpoint);
}
}
+
+ public static String getPath(Map<String, String> configuration) {
+ return GCSConstants.HADOOP_GCS_PROTOCOL + "://"
+ +
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ +
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 2c15b5a844..1f25c4baf2 100644
---
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -28,4 +28,5 @@
org.apache.asterix.external.input.record.reader.azure.blob.AzureBlobReaderFactor
org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
org.apache.asterix.external.input.record.reader.azure.parquet.AzureDataLakeParquetReaderFactory
-org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory
\ No newline at end of file
+org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory
+org.apache.asterix.external.input.record.reader.gcs.delta.GCSDeltaReaderFactory
\ No newline at end of file