This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b96e2e181e [ASTERIXDB-3503][EXT] Deltalake format support
b96e2e181e is described below

commit b96e2e181ef4960aeabbb2c1ff228b7dae8048e7
Author: ayush.tripathi <[email protected]>
AuthorDate: Wed Aug 21 15:49:53 2024 +0530

    [ASTERIXDB-3503][EXT] Deltalake format support
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    Creating external dataset from a given Deltalake table path(S3 as source).
    
    Change-Id: Iff608397aab711f324861fe83eeb428f73682912
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18708
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ayush Tripathi <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../external_dataset/ExternalDatasetTestUtils.java |  34 ++++
 .../aws/AwsS3ExternalDatasetTest.java              |   2 +
 .../deltalake/DeltaTableGenerator.java             | 224 +++++++++++++++++++++
 .../deltalake-empty/deltalake-empty.00.ddl.sqlpp   |  35 ++++
 .../deltalake-empty/deltalake-empty.01.query.sqlpp |  22 ++
 .../deltalake-modified-data.00.ddl.sqlpp           |  35 ++++
 .../deltalake-modified-data.01.query.sqlpp         |  22 ++
 .../deltalake-multiple_file_read.00.ddl.sqlpp      |  35 ++++
 .../deltalake-multiple_file_read.01.query.sqlpp    |  22 ++
 .../common/deltalake-empty/deltalake-empty.01.adm  |   0
 .../deltalake-modified-data.01.adm                 |   1 +
 .../deltalake-multiple-file-read/read-data.2.adm   |   4 +
 .../s3/deltalake-empty/deltalake-empty.01.adm      |   0
 .../deltalake-modified-data.01.adm                 |   1 +
 .../external-dataset/s3/deltalake/read-data.2.adm  |   4 +
 .../runtimets/testsuite_external_dataset_s3.xml    |  19 ++
 .../asterix/common/exceptions/ErrorCode.java       |   1 +
 .../src/main/resources/asx_errormsg/en.properties  |   1 +
 asterixdb/asterix-external-data/pom.xml            |   5 +
 .../reader/aws/delta/AwsS3DeltaReaderFactory.java  |  68 +++++++
 .../provider/DatasourceFactoryProvider.java        |   6 +
 .../external/util/ExternalDataConstants.java       |   3 +-
 .../asterix/external/util/ExternalDataUtils.java   | 134 +++++++-----
 .../asterix/external/util/aws/s3/S3Utils.java      |   8 +-
 ...pache.asterix.external.api.IRecordReaderFactory |   1 +
 25 files changed, 635 insertions(+), 52 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index 79d298a7b3..db1bf5039b 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -23,6 +23,7 @@ import static 
org.apache.asterix.test.external_dataset.aws.AwsS3ExternalDatasetT
 import static 
org.apache.asterix.test.external_dataset.aws.AwsS3ExternalDatasetTest.BROWSE_CONTAINER;
 import static 
org.apache.asterix.test.external_dataset.aws.AwsS3ExternalDatasetTest.DYNAMIC_PREFIX_AT_START_CONTAINER;
 import static 
org.apache.asterix.test.external_dataset.aws.AwsS3ExternalDatasetTest.FIXED_DATA_CONTAINER;
+import static 
org.apache.asterix.test.external_dataset.deltalake.DeltaTableGenerator.DELTA_GEN_BASEDIR;
 import static 
org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.BINARY_GEN_BASEDIR;
 
 import java.io.BufferedWriter;
@@ -36,9 +37,11 @@ import java.nio.file.Paths;
 import java.util.Collection;
 
 import org.apache.asterix.test.external_dataset.avro.AvroFileConverterUtil;
+import org.apache.asterix.test.external_dataset.deltalake.DeltaTableGenerator;
 import 
org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -114,6 +117,13 @@ public class ExternalDatasetTestUtils {
         AvroFileConverterUtil.convertToAvro(basePath, avroRawJsonDir, 
AVRO_GEN_BASEDIR);
     }
 
+    public static void createDeltaTable() throws IOException {
+        File basePath = new File(".");
+        // cleaning directory
+        BinaryFileConverterUtil.cleanBinaryDirectory(basePath, 
DELTA_GEN_BASEDIR);
+        DeltaTableGenerator.prepareDeltaTableContainer(new Configuration());
+    }
+
     /**
      * Generate binary files (e.g., parquet files)
      */
@@ -192,6 +202,10 @@ public class ExternalDatasetTestUtils {
         loadAvroFiles();
         LOGGER.info("Avro files added successfully");
 
+        LOGGER.info("Adding Delta Table files to the bucket");
+        loadDeltaTableFiles();
+        LOGGER.info("Delta files added successfully");
+
         LOGGER.info("Files added successfully");
     }
 
@@ -412,6 +426,26 @@ public class ExternalDatasetTestUtils {
         }
     }
 
+    private static void loadDeltaTableFiles() {
+        String generatedDataBasePath = DELTA_GEN_BASEDIR;
+        loadDeltaDirectory(generatedDataBasePath, "/empty_delta_table", 
PARQUET_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, 
"/empty_delta_table/_delta_log", JSON_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, "/modified_delta_table", 
PARQUET_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, 
"/modified_delta_table/_delta_log", JSON_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, 
"/multiple_file_delta_table", PARQUET_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, 
"/multiple_file_delta_table/_delta_log", JSON_FILTER, "delta-data/");
+    }
+
+    private static void loadDeltaDirectory(String dataBasePath, String 
rootPath, FilenameFilter filter,
+            String definitionPart) {
+        Collection<File> files = 
IoUtil.getMatchingFiles(Paths.get(dataBasePath + rootPath), filter);
+        for (File file : files) {
+            String fileName = file.getName();
+            String externalFilterDefinition = 
file.getParent().substring(dataBasePath.length() + 1) + "/";
+            loadData(file.getParent(), "", fileName, definitionPart + 
externalFilterDefinition, "", false, false);
+        }
+    }
+
     private static void loadDirectory(String dataBasePath, String rootPath, 
FilenameFilter filter) {
         File dir = new File(dataBasePath, rootPath);
         if (!dir.exists() || !dir.isDirectory()) {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
index 9a2bfbcc7c..665b8057ec 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.
 import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createAvroFilesRecursively;
 import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFiles;
 import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFilesRecursively;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createDeltaTable;
 import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setDataPaths;
 import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setUploaders;
 import static 
org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.DEFAULT_PARQUET_SRC_PATH;
@@ -199,6 +200,7 @@ public class AwsS3ExternalDatasetTest {
         createBinaryFilesRecursively(EXTERNAL_FILTER_DATA_PATH);
         createAvroFiles(DEFAULT_PARQUET_SRC_PATH);
         createAvroFilesRecursively(EXTERNAL_FILTER_DATA_PATH);
+        createDeltaTable();
         setNcEndpoints(testExecutor);
         startAwsS3MockServer();
     }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
new file mode 100644
index 0000000000..f041fa2e9c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.external_dataset.deltalake;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Operation;
+import io.delta.standalone.OptimisticTransaction;
+import io.delta.standalone.actions.Action;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.Metadata;
+import io.delta.standalone.actions.RemoveFile;
+import io.delta.standalone.types.IntegerType;
+import io.delta.standalone.types.StringType;
+import io.delta.standalone.types.StructField;
+import io.delta.standalone.types.StructType;
+
+public class DeltaTableGenerator {
+    public static final String DELTA_GEN_BASEDIR = "target" + 
File.separatorChar + "generated_delta_files";
+    public static final String DELTA_EMPTY_TABLE =
+            "target" + File.separatorChar + "generated_delta_files" + 
File.separatorChar + "empty_delta_table";
+    public static final String DELTA_MODIFIED_TABLE =
+            "target" + File.separatorChar + "generated_delta_files" + 
File.separatorChar + "modified_delta_table";
+    public static final String DELTA_MULTI_FILE_TABLE =
+            "target" + File.separatorChar + "generated_delta_files" + 
File.separatorChar + "multiple_file_delta_table";
+
+    public static void prepareDeltaTableContainer(Configuration conf) {
+        File basePath = new File(".");
+        cleanBinaryDirectory(basePath, DELTA_GEN_BASEDIR);
+        prepareMultipleFilesTable(conf);
+        prepareModifiedTable(conf);
+        prepareEmptyTable(conf);
+    }
+
+    public static void cleanBinaryDirectory(File localDataRoot, String 
binaryFilesPath) {
+        try {
+            File destPath = new File(localDataRoot, binaryFilesPath);
+            //Delete old generated files
+            if (destPath.exists()) {
+                IoUtil.delete(destPath);
+            }
+            //Create new directory
+            Files.createDirectory(Paths.get(destPath.getAbsolutePath()));
+        } catch (IOException e) {
+
+        }
+
+    }
+
+    public static void prepareEmptyTable(Configuration conf) {
+        List<Action> actions = List.of();
+        DeltaLog log = DeltaLog.forTable(conf, DELTA_EMPTY_TABLE);
+        OptimisticTransaction txn = log.startTransaction();
+        Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new 
ArrayList<>())
+                .schema(new StructType().add(new StructField("id", new 
IntegerType(), true))
+                        .add(new StructField("data", new StringType(), true)))
+                .build();
+        txn.updateMetadata(metaData);
+        txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), 
"deltalake-table-create");
+    }
+
+    public static void prepareModifiedTable(Configuration conf) {
+        Schema schema = 
SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("data").endRecord();
+        try {
+            Path path = new Path(DELTA_MODIFIED_TABLE, "firstFile.parquet");
+            ParquetWriter<GenericData.Record> writer =
+                    AvroParquetWriter.<GenericData.Record> 
builder(path).withConf(conf).withSchema(schema).build();
+
+            List<GenericData.Record> fileFirstSnapshotRecords = List.of(new 
GenericData.Record(schema),
+                    new GenericData.Record(schema), new 
GenericData.Record(schema));
+            List<GenericData.Record> fileSecondSnapshotRecords = List.of(new 
GenericData.Record(schema));
+
+            fileFirstSnapshotRecords.get(0).put("id", 0);
+            fileFirstSnapshotRecords.get(0).put("data", "vibrant_mclean");
+
+            fileFirstSnapshotRecords.get(1).put("id", 1);
+            fileFirstSnapshotRecords.get(1).put("data", "frosty_wilson");
+
+            fileFirstSnapshotRecords.get(2).put("id", 2);
+            fileFirstSnapshotRecords.get(2).put("data", "serene_kirby");
+
+            fileSecondSnapshotRecords.get(0).put("id", 2);
+            fileSecondSnapshotRecords.get(0).put("data", "serene_kirby");
+
+            for (GenericData.Record record : fileFirstSnapshotRecords) {
+                writer.write(record);
+            }
+
+            long size = writer.getDataSize();
+            writer.close();
+
+            List<Action> actions = List.of(new AddFile("firstFile.parquet", 
new HashMap<>(), size,
+                    System.currentTimeMillis(), true, null, null));
+            DeltaLog log = DeltaLog.forTable(conf, DELTA_MODIFIED_TABLE);
+            OptimisticTransaction txn = log.startTransaction();
+            Metadata metaData = 
txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
+                    .schema(new StructType().add(new StructField("id", new 
IntegerType(), true))
+                            .add(new StructField("data", new StringType(), 
true)))
+                    .build();
+            txn.updateMetadata(metaData);
+            txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), 
"deltalake-table-create");
+
+            Path path2 = new Path(DELTA_MODIFIED_TABLE, "secondFile.parquet");
+            ParquetWriter<GenericData.Record> writer2 =
+                    AvroParquetWriter.<GenericData.Record> 
builder(path2).withConf(conf).withSchema(schema).build();
+
+            for (GenericData.Record record : fileSecondSnapshotRecords) {
+                writer2.write(record);
+            }
+            long size2 = writer2.getDataSize();
+            writer2.close();
+            AddFile addFile = new AddFile("firstFile.parquet", new 
HashMap<>(), size, System.currentTimeMillis(), true,
+                    null, null);
+            RemoveFile removeFile = addFile.remove();
+            List<Action> actions3 = List.of(removeFile, new 
AddFile("secondFile.parquet", new HashMap<>(), size2,
+                    System.currentTimeMillis(), true, null, null));
+            OptimisticTransaction txn3 = log.startTransaction();
+            txn3.commit(actions3, new Operation(Operation.Name.DELETE), 
"deltalake-table-delete");
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void prepareMultipleFilesTable(Configuration conf) {
+        Schema schema = 
SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name")
+                .requiredString("age").endRecord();
+        try {
+            Path path = new Path(DELTA_MULTI_FILE_TABLE, "firstFile.parquet");
+            ParquetWriter<GenericData.Record> writer =
+                    AvroParquetWriter.<GenericData.Record> 
builder(path).withConf(conf).withSchema(schema).build();
+
+            List<GenericData.Record> fileFirstSnapshotRecords = List.of(new 
GenericData.Record(schema),
+                    new GenericData.Record(schema), new 
GenericData.Record(schema));
+            List<GenericData.Record> fileSecondSnapshotRecords = List.of(new 
GenericData.Record(schema));
+
+            fileFirstSnapshotRecords.get(0).put("id", 0);
+            fileFirstSnapshotRecords.get(0).put("name", "Cooper");
+            fileFirstSnapshotRecords.get(0).put("age", "42");
+
+            fileFirstSnapshotRecords.get(1).put("id", 1);
+            fileFirstSnapshotRecords.get(1).put("name", "Murphy");
+            fileFirstSnapshotRecords.get(1).put("age", "16");
+
+            fileFirstSnapshotRecords.get(2).put("id", 2);
+            fileFirstSnapshotRecords.get(2).put("name", "Mann");
+            fileFirstSnapshotRecords.get(2).put("age", "45");
+
+            fileSecondSnapshotRecords.get(0).put("id", 3);
+            fileSecondSnapshotRecords.get(0).put("name", "Brand");
+            fileSecondSnapshotRecords.get(0).put("age", "35");
+
+            for (GenericData.Record record : fileFirstSnapshotRecords) {
+                writer.write(record);
+            }
+
+            long size = writer.getDataSize();
+            writer.close();
+
+            List<Action> actions = List.of(new AddFile("firstFile.parquet", 
new HashMap<>(), size,
+                    System.currentTimeMillis(), true, null, null));
+            DeltaLog log = DeltaLog.forTable(conf, DELTA_MULTI_FILE_TABLE);
+            OptimisticTransaction txn = log.startTransaction();
+            Metadata metaData = 
txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
+                    .schema(new StructType().add(new StructField("id", new 
IntegerType(), true))
+                            .add(new StructField("name", new StringType(), 
true))
+                            .add(new StructField("age", new IntegerType(), 
true)))
+                    .build();
+            txn.updateMetadata(metaData);
+            txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), 
"deltalake-table-create");
+
+            Path path2 = new Path(DELTA_MULTI_FILE_TABLE, 
"secondFile.parquet");
+            ParquetWriter<GenericData.Record> writer2 =
+                    AvroParquetWriter.<GenericData.Record> 
builder(path2).withConf(conf).withSchema(schema).build();
+
+            for (GenericData.Record record : fileSecondSnapshotRecords) {
+                writer2.write(record);
+            }
+
+            long size2 = writer2.getDataSize();
+            writer2.close();
+
+            List<Action> actions2 = List.of(new AddFile("secondFile.parquet", 
new HashMap<>(), size2,
+                    System.currentTimeMillis(), true, null, null));
+            OptimisticTransaction txn2 = log.startTransaction();
+            txn2.commit(actions2, new Operation(Operation.Name.WRITE), 
"deltalake-table-create");
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
new file mode 100644
index 0000000000..f21f0131fc
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL DATASET DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="my-table-empty"),
+   ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.01.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.01.query.sqlpp
new file mode 100644
index 0000000000..84e79142a8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.00.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.00.ddl.sqlpp
new file mode 100644
index 0000000000..8adab7721c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL DATASET DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="delta-data/modified_delta_table"),
+   ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.query.sqlpp
new file mode 100644
index 0000000000..84e79142a8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.00.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.00.ddl.sqlpp
new file mode 100644
index 0000000000..61c5c3ac38
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL DATASET DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="delta-data/multiple_file_delta_table"),
+   ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.01.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.01.query.sqlpp
new file mode 100644
index 0000000000..bfd65819f2
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds order by ds.id;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-empty/deltalake-empty.01.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-empty/deltalake-empty.01.adm
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.adm
new file mode 100644
index 0000000000..dfdc3b0a1e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.adm
@@ -0,0 +1 @@
+{ "id": 2, "data": "serene_kirby" }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.2.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.2.adm
new file mode 100644
index 0000000000..afae366266
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.2.adm
@@ -0,0 +1,4 @@
+{ "id": 0, "name": "Cooper", "age": "42" }
+{ "id": 1, "name": "Murphy", "age": "16" }
+{ "id": 2, "name": "Mann", "age": "45" }
+{ "id": 3, "name": "Brand", "age": "35" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake-empty/deltalake-empty.01.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake-empty/deltalake-empty.01.adm
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake-modified-data/deltalake-modified-data.01.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake-modified-data/deltalake-modified-data.01.adm
new file mode 100644
index 0000000000..dfdc3b0a1e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake-modified-data/deltalake-modified-data.01.adm
@@ -0,0 +1 @@
+{ "id": 2, "data": "serene_kirby" }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake/read-data.2.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake/read-data.2.adm
new file mode 100644
index 0000000000..afae366266
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake/read-data.2.adm
@@ -0,0 +1,4 @@
+{ "id": 0, "name": "Cooper", "age": "42" }
+{ "id": 1, "name": "Murphy", "age": "16" }
+{ "id": 2, "name": "Mann", "age": "45" }
+{ "id": 3, "name": "Brand", "age": "35" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 5501aec68c..48e102fb1f 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -345,6 +345,25 @@
       </compilation-unit>
     </test-case>
     <!-- Parquet Tests End -->
+    <!-- Deltalake Tests Start -->
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-multiple-file-read">
+        <placeholder name="adapter" value="S3" />
+        <output-dir 
compare="Text">common/deltalake-multiple-file-read</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-empty">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">common/deltalake-empty</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-modified-data">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">common/deltalake-modified-data</output-dir>
+      </compilation-unit>
+    </test-case>
     <test-case FilePath="external-dataset">
       <compilation-unit name="common/avro/avro-types/avro-map">
         <placeholder name="adapter" value="S3" />
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 832a7cccc0..25e24bb53d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -98,6 +98,7 @@ public enum ErrorCode implements IError {
     UNSUPPORTED_COLUMN_TYPE(67),
     INVALID_KEY_TYPE(68),
     FAILED_TO_READ_KEY(69),
+    INVALID_DELTA_PARAMETER(70),
 
     UNSUPPORTED_JRE(100),
 
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 748a2b9d29..817127f539 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -41,6 +41,7 @@
 1009 = A returning expression cannot contain dataset access
 37,1091 = Type mismatch: expected value of type %1$s, but got the value of 
type %2$s
 51 = Incomparable input types: %1$s and %2$s
+70 = Table Type 'delta' supports parquet file formats only.
 
 # Data errors
 6 = Invalid format for %1$s in %2$s
diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index 6a3f891aef..21eaf71c89 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -573,6 +573,11 @@
       <groupId>org.apache.avro</groupId>
       <artifactId>avro</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.delta</groupId>
+      <artifactId>delta-standalone_2.12</artifactId>
+      <version>3.0.0</version>
+    </dependency>
   </dependencies>
   <!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 -->
   <repositories>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
new file mode 100644
index 0000000000..f5a2cd5820
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static 
org.apache.asterix.external.util.ExternalDataUtils.prepareDeltaTableFormat;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import 
org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class AwsS3DeltaReaderFactory extends AwsS3ParquetReaderFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
+            IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+            throws AlgebricksException, HyracksDataException {
+
+        Configuration conf = new Configuration();
+        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+        conf.set(S3Constants.HADOOP_REGION, 
configuration.get(S3Constants.REGION_FIELD_NAME));
+        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
+        if (serviceEndpoint != null) {
+            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
+        }
+        String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+                + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+
+        prepareDeltaTableFormat(configuration, conf, tableMetadataPath);
+        super.configure(serviceCtx, configuration, warningCollector, 
filterEvaluatorFactory);
+    }
+
+    @Override
+    public Set<String> getReaderSupportedFormats() {
+        return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index f35dce4aa5..0360c15fb1 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.provider;
 
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.InvocationTargetException;
@@ -121,6 +123,10 @@ public class DatasourceFactoryProvider {
         if (factories.containsKey(adaptorName)) {
             Map<String, Class<?>> formatClassMap = factories.get(adaptorName);
             String format = 
configuration.get(ExternalDataConstants.KEY_FORMAT);
+            if (isDeltaTable(configuration)) {
+                format = configuration.get(ExternalDataConstants.TABLE_FORMAT);
+                return getInstance(formatClassMap.getOrDefault(format, 
formatClassMap.get(DEFAULT_FORMAT)));
+            }
             return getInstance(formatClassMap.getOrDefault(format, 
formatClassMap.get(DEFAULT_FORMAT)));
         }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 0407005438..23aa5dc28e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -124,7 +124,7 @@ public class ExternalDataConstants {
     // a string representing the format of the record (for adapters which 
produces records with additional information like pk or metadata)
     public static final String KEY_RECORD_FORMAT = "record-format";
     public static final String TABLE_FORMAT = "table-format";
-    public static final String ICEBERG_METADATA_LOCATION = "metadata-path";
+    public static final String TABLE_METADATA_LOCATION = "metadata-path";
     public static final int SUPPORTED_ICEBERG_FORMAT_VERSION = 1;
     public static final String KEY_META_TYPE_NAME = "meta-type-name";
     public static final String KEY_ADAPTER_NAME = "adapter-name";
@@ -219,6 +219,7 @@ public class ExternalDataConstants {
     public static final String DUMMY_TYPE_NAME = "typeName";
     public static final String DUMMY_DATAVERSE_NAME = "a.b.c";
     public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg";
+    public static final String FORMAT_DELTA = "delta";
     public static final Set<String> ALL_FORMATS;
     public static final Set<String> TEXTUAL_FORMATS;
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 104f301071..8d332610e8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -108,6 +108,10 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+
 public class ExternalDataUtils {
     private static final Map<ATypeTag, IValueParserFactory> 
valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
     private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
@@ -469,6 +473,10 @@ public class ExternalDataUtils {
         }
 
         if (configuration.containsKey(ExternalDataConstants.TABLE_FORMAT)) {
+            if 
(configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_DELTA))
 {
+                configuration.put(ExternalDataConstants.KEY_PARSER, 
ExternalDataConstants.FORMAT_NOOP);
+                configuration.put(ExternalDataConstants.KEY_FORMAT, 
ExternalDataConstants.FORMAT_PARQUET);
+            }
             prepareTableFormat(configuration);
         }
     }
@@ -478,68 +486,96 @@ public class ExternalDataUtils {
      *
      * @param configuration external data configuration
      */
-    public static void prepareTableFormat(Map<String, String> configuration) 
throws AlgebricksException {
-        // Apache Iceberg table format
-        if 
(configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_APACHE_ICEBERG))
 {
-            Configuration conf = new Configuration();
-
-            String metadata_path = 
configuration.get(ExternalDataConstants.ICEBERG_METADATA_LOCATION);
-
-            // If the table is in S3
-            if (configuration.get(ExternalDataConstants.KEY_READER)
-                    .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
-
-                conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-                conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
-                        
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-                metadata_path = S3Constants.HADOOP_S3_PROTOCOL + "://"
-                        + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
-                        + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
-            } else if 
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_HDFS))
 {
-                conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
-                        configuration.get(ExternalDataConstants.KEY_HDFS_URL));
-                metadata_path = 
configuration.get(ExternalDataConstants.KEY_HDFS_URL) + '/' + metadata_path;
-            }
-
-            HadoopTables tables = new HadoopTables(conf);
+    public static boolean isDeltaTable(Map<String, String> configuration) {
+        return configuration.containsKey(ExternalDataConstants.TABLE_FORMAT)
+                && 
configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_DELTA);
+    }
 
-            Table icebergTable = tables.load(metadata_path);
+    public static void validateDeltaTableProperties(Map<String, String> 
configuration) throws CompilationException {
+        if (!(configuration.get(ExternalDataConstants.KEY_FORMAT) == null
+                || 
configuration.get(ExternalDataConstants.KEY_FORMAT).equals(ExternalDataConstants.FORMAT_PARQUET)))
 {
+            throw new CompilationException(ErrorCode.INVALID_DELTA_PARAMETER);
+        }
+    }
 
-            if (icebergTable instanceof BaseTable) {
-                BaseTable baseTable = (BaseTable) icebergTable;
+    public static void prepareDeltaTableFormat(Map<String, String> 
configuration, Configuration conf,
+            String tableMetadataPath) {
+        DeltaLog deltaLog = DeltaLog.forTable(conf, tableMetadataPath);
+        Snapshot snapshot = deltaLog.snapshot();
+        List<AddFile> dataFiles = snapshot.getAllFiles();
+        StringBuilder builder = new StringBuilder();
+        for (AddFile batchFile : dataFiles) {
+            builder.append(",");
+            String path = batchFile.getPath();
+            builder.append(tableMetadataPath).append('/').append(path);
+        }
+        if (builder.length() > 0) {
+            builder.deleteCharAt(0);
+        }
+        configuration.put(ExternalDataConstants.KEY_PATH, builder.toString());
+    }
 
-                if (baseTable.operations().current()
-                        .formatVersion() != 
ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION) {
-                    throw new 
AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_FORMAT_VERSION,
-                            "AsterixDB only supports Iceberg version up to "
-                                    + 
ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION);
-                }
+    public static void prepareIcebergTableFormat(Map<String, String> 
configuration, Configuration conf,
+            String tableMetadataPath) throws AlgebricksException {
+        HadoopTables tables = new HadoopTables(conf);
+        Table icebergTable = tables.load(tableMetadataPath);
 
-                try (CloseableIterable<FileScanTask> fileScanTasks = 
baseTable.newScan().planFiles()) {
+        if (icebergTable instanceof BaseTable) {
+            BaseTable baseTable = (BaseTable) icebergTable;
 
-                    StringBuilder builder = new StringBuilder();
+            if (baseTable.operations().current()
+                    .formatVersion() != 
ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION) {
+                throw new 
AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_FORMAT_VERSION,
+                        "AsterixDB only supports Iceberg version up to "
+                                + 
ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION);
+            }
 
-                    for (FileScanTask task : fileScanTasks) {
-                        builder.append(",");
-                        String path = task.file().path().toString();
-                        builder.append(path);
-                    }
+            try (CloseableIterable<FileScanTask> fileScanTasks = 
baseTable.newScan().planFiles()) {
 
-                    if (builder.length() > 0) {
-                        builder.deleteCharAt(0);
-                    }
+                StringBuilder builder = new StringBuilder();
 
-                    configuration.put(ExternalDataConstants.KEY_PATH, 
builder.toString());
+                for (FileScanTask task : fileScanTasks) {
+                    builder.append(",");
+                    String path = task.file().path().toString();
+                    builder.append(path);
+                }
 
-                } catch (IOException e) {
-                    throw new 
AsterixException(ErrorCode.ERROR_READING_ICEBERG_METADATA, e);
+                if (builder.length() > 0) {
+                    builder.deleteCharAt(0);
                 }
 
-            } else {
-                throw new AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_TABLE,
-                        "Invalid iceberg base table. Please remove metadata 
specifiers");
+                configuration.put(ExternalDataConstants.KEY_PATH, 
builder.toString());
+
+            } catch (IOException e) {
+                throw new 
AsterixException(ErrorCode.ERROR_READING_ICEBERG_METADATA, e);
             }
 
+        } else {
+            throw new AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_TABLE,
+                    "Invalid iceberg base table. Please remove metadata 
specifiers");
+        }
+    }
+
+    public static void prepareTableFormat(Map<String, String> configuration) 
throws AlgebricksException {
+        Configuration conf = new Configuration();
+        String tableMetadataPath = 
configuration.get(ExternalDataConstants.TABLE_METADATA_LOCATION);
+
+        // If the table is in S3
+        if 
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3))
 {
+
+            conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+            conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+            tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+                    + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                    + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+        } else if 
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_HDFS))
 {
+            conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
+                    configuration.get(ExternalDataConstants.KEY_HDFS_URL));
+            tableMetadataPath = 
configuration.get(ExternalDataConstants.KEY_HDFS_URL) + '/' + tableMetadataPath;
+        }
+        // Apache Iceberg table format
+        if 
(configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_APACHE_ICEBERG))
 {
+            prepareIcebergTableFormat(configuration, conf, tableMetadataPath);
         }
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 6a16913258..891d7f3bfa 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -23,6 +23,8 @@ import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_I
 import static 
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
 import static 
org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
@@ -272,9 +274,11 @@ public class S3Utils {
      */
     public static void validateProperties(Map<String, String> configuration, 
SourceLocation srcLoc,
             IWarningCollector collector) throws CompilationException {
-
+        if (isDeltaTable(configuration)) {
+            validateDeltaTableProperties(configuration);
+        }
         // check if the format property is present
-        if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+        else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
         }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 88f3fcb440..2c15b5a844 100644
--- 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -22,6 +22,7 @@ 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
 
org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory
 org.apache.asterix.external.input.record.reader.aws.AwsS3ReaderFactory
 
org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory
+org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory
 org.apache.asterix.external.input.record.reader.gcs.GCSReaderFactory
 
org.apache.asterix.external.input.record.reader.azure.blob.AzureBlobReaderFactory
 
org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory

Reply via email to