This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 3311364a512812c5c75d8c864545fc761c456bbf
Author: Peeyush Gupta <[email protected]>
AuthorDate: Tue Jul 22 12:19:12 2025 -0700

    [ASTERIXDB-3633][EXT] Support file splits while reading delta tables
    
     - user model changes: no
     - storage format changes: no
     - interface changes: no
    
    Ext-ref:MB-66319
    Change-Id: Ie6daf3846064326bfe749ad15b508fe27d1721ca
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20135
    Reviewed-by: Peeyush Gupta <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../provider/SqlppCompilationProvider.java         |   3 +-
 .../deltalake-file-nine.02.query.sqlpp             |  23 +++
 .../deltalake-multiple_file_read.02.query.sqlpp    |  23 +++
 .../deltalake-partitioned-file-read.03.query.sqlpp |  23 +++
 .../api/cluster_state_1/cluster_state_1.1.regexadm |   1 +
 .../cluster_state_1_full.1.regexadm                |   1 +
 .../cluster_state_1_less.1.regexadm                |   1 +
 .../deltalake-file-nine/deltalake-file-nine.02.adm |   9 +
 .../deltalake-multiple-file-read/read-data.3.adm   |   4 +
 .../read-data.3.adm                                |   3 +
 .../asterix/common/config/CompilerProperties.java  |   8 +-
 .../input/record/reader/aws/delta/DeltaEngine.java |  47 ++++++
 .../reader/aws/delta/DeltaFileRecordReader.java    |  75 +++++++--
 .../reader/aws/delta/DeltaParquetFileReader.java   | 187 +++++++++++++++++++++
 .../reader/aws/delta/DeltaParquetHandler.java      |  80 +++++++++
 .../reader/aws/delta/DeltaReaderFactory.java       |  95 ++++++++++-
 .../reader/aws/delta/SerializableFileSplit.java    |  45 +++++
 .../metadata/declared/DatasetDataSource.java       |  11 ++
 18 files changed, 614 insertions(+), 25 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
index 10574b1cb2..ca97aa6734 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
@@ -96,7 +96,8 @@ public class SqlppCompilationProvider implements 
ILangCompilationProvider {
                 CompilerProperties.COMPILER_QUERY_PLAN_SHAPE_KEY, 
CompilerProperties.COMPILER_MIN_MEMORY_ALLOCATION_KEY,
                 CompilerProperties.COMPILER_COLUMN_FILTER_KEY, 
CompilerProperties.COMPILER_BATCH_LOOKUP_KEY,
                 FunctionUtil.IMPORT_PRIVATE_FUNCTIONS,
-                
CompilerProperties.COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING_KEY, 
FuzzyUtils.SIM_FUNCTION_PROP_NAME,
+                
CompilerProperties.COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING_KEY,
+                CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY, 
FuzzyUtils.SIM_FUNCTION_PROP_NAME,
                 FuzzyUtils.SIM_THRESHOLD_PROP_NAME, 
StartFeedStatement.WAIT_FOR_COMPLETION,
                 FeedActivityDetails.FEED_POLICY_NAME, 
FeedActivityDetails.COLLECT_LOCATIONS,
                 SqlppQueryRewriter.INLINE_WITH_OPTION, 
SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.query.sqlpp
new file mode 100644
index 0000000000..3ee2964132
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+SET `compiler.deltalake.filesplits` "true";
+ SELECT element ds FROM DeltalakeDataset as ds order by id;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.02.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.02.query.sqlpp
new file mode 100644
index 0000000000..47156dd82a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.02.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+SET `compiler.deltalake.filesplits` "true";
+SELECT element ds FROM DeltalakeDataset as ds order by ds.id;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp
new file mode 100644
index 0000000000..a8fb52984b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+SET `compiler.deltalake.filesplits` "true";
+ SELECT element ds FROM DeltalakeDataset2 as ds where 
ds.timestamp=datetime("2025-01-01T00:01:20Z") order by ds.id;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 065435b55c..6c78c21b42 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -37,6 +37,7 @@
     "compiler.cbo" : true,
     "compiler.column.filter" : true,
     "compiler.copy.to.write.buffer.size" : 8388608,
+    "compiler.deltalake.filesplits" : false,
     "compiler\.external\.field\.pushdown" : true,
     "compiler.forcejoinorder" : false,
     "compiler\.framesize" : 32768,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 28fa1f2b6c..c44e600a3b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -37,6 +37,7 @@
     "compiler.cbo" : true,
     "compiler.column.filter" : true,
     "compiler.copy.to.write.buffer.size" : 8388608,
+    "compiler.deltalake.filesplits" : false,
     "compiler\.external\.field\.pushdown" : true,
     "compiler.forcejoinorder" : false,
     "compiler\.framesize" : 32768,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 8e0e9a6b00..10eab67e07 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -37,6 +37,7 @@
     "compiler.cbo" : true,
     "compiler.column.filter" : true,
     "compiler.copy.to.write.buffer.size" : 8388608,
+    "compiler.deltalake.filesplits" : false,
     "compiler\.external\.field\.pushdown" : true,
     "compiler.forcejoinorder" : false,
     "compiler\.framesize" : 32768,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.adm
new file mode 100644
index 0000000000..500f6a9970
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.adm
@@ -0,0 +1,9 @@
+{ "id": 0, "name": "Cooper" }
+{ "id": 1, "name": "Adam" }
+{ "id": 2, "name": "Third" }
+{ "id": 3, "name": "Fourth" }
+{ "id": 4, "name": "Five" }
+{ "id": 5, "name": "Six" }
+{ "id": 6, "name": "Seven" }
+{ "id": 7, "name": "Eight" }
+{ "id": 8, "name": "Nine" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.3.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.3.adm
new file mode 100644
index 0000000000..afae366266
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.3.adm
@@ -0,0 +1,4 @@
+{ "id": 0, "name": "Cooper", "age": "42" }
+{ "id": 1, "name": "Murphy", "age": "16" }
+{ "id": 2, "name": "Mann", "age": "45" }
+{ "id": 3, "name": "Brand", "age": "35" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.3.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.3.adm
new file mode 100644
index 0000000000..c185038fa0
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.3.adm
@@ -0,0 +1,3 @@
+{ "id": 0, "name": "Order 1", "timestamp": datetime("2025-01-01T00:01:20.000") 
}
+{ "id": 1, "name": "Order 2", "timestamp": datetime("2025-01-01T00:01:20.000") 
}
+{ "id": 2, "name": "Order 3", "timestamp": datetime("2025-01-01T00:01:20.000")
\ No newline at end of file
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index a7e39c9b2a..2adfd7d9cc 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -157,7 +157,8 @@ public class CompilerProperties extends AbstractProperties {
                 getRangedIntegerType(0, Integer.MAX_VALUE),
                 128,
                 "Maximum occurrences of a variable allowed in an expression 
for inlining"),
-        COMPILER_ORDERED_FIELDS(BOOLEAN, AlgebricksConfig.ORDERED_FIELDS, 
"Enable/disable select order list");
+        COMPILER_ORDERED_FIELDS(BOOLEAN, AlgebricksConfig.ORDERED_FIELDS, 
"Enable/disable select order list"),
+        COMPILER_DELTALAKE_FILESPLITS(BOOLEAN, false, "Enable/disable delta 
lake file splits");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -245,6 +246,7 @@ public class CompilerProperties extends AbstractProperties {
     public static final String COMPILER_ORDERED_FIELDS_KEY = 
Option.COMPILER_ORDERED_FIELDS.ini();
 
     public static final int COMPILER_PARALLELISM_AS_STORAGE = 0;
+    public static final String COMPILER_DELTALAKE_FILESPLITS_KEY = 
Option.COMPILER_DELTALAKE_FILESPLITS.ini();
 
     public CompilerProperties(PropertiesAccessor accessor) {
         super(accessor);
@@ -387,4 +389,8 @@ public class CompilerProperties extends AbstractProperties {
     public int getMaxVariableOccurrencesForInlining() {
         return 
accessor.getInt(Option.COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING);
     }
+
+    public boolean isDeltaLakeFileSplitsEnabled() {
+        return accessor.getBoolean(Option.COMPILER_DELTALAKE_FILESPLITS);
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java
new file mode 100644
index 0000000000..141040569b
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import org.apache.hadoop.conf.Configuration;
+
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.defaults.engine.fileio.FileIO;
+import io.delta.kernel.defaults.engine.hadoopio.HadoopFileIO;
+import io.delta.kernel.engine.ParquetHandler;
+
+public class DeltaEngine extends DefaultEngine {
+
+    private final FileIO fileIO;
+    private final Configuration conf;
+
+    protected DeltaEngine(FileIO fileIO, Configuration conf) {
+        super(fileIO);
+        this.fileIO = fileIO;
+        this.conf = conf;
+    }
+
+    public static DeltaEngine create(Configuration configuration) {
+        return new DeltaEngine(new HadoopFileIO(configuration), configuration);
+    }
+
+    public ParquetHandler getParquetHandler() {
+        return new DeltaParquetHandler(this.fileIO, this.conf);
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index 121a76b2e8..a9d60d6244 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -19,11 +19,13 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
 import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+import static io.delta.kernel.internal.util.Utils.toCloseableIterator;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -70,9 +72,12 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
     private Row scanFile;
     private CloseableIterator<Row> rows;
     private Optional<Predicate> filterPredicate;
+    private final boolean usingSplits;
+    private List<Map.Entry<String, List<SerializableFileSplit>>> scanAndSplits;
 
     public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, ConfFactory config,
             String filterExpressionStr) throws HyracksDataException {
+        this.usingSplits = false;
         JobConf conf = config.getConf();
         this.engine = DefaultEngine.create(conf);
         this.scanFiles = new ArrayList<>();
@@ -88,19 +93,47 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
         if (scanFiles.size() > 0) {
             this.fileIndex = 0;
             this.scanFile = scanFiles.get(0);
-            this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
-            this.physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
-            this.filterPredicate = 
PredicateSerDe.deserializeExpressionFromJson(filterExpressionStr);
-            try {
+            initializeDataIterators(filterExpressionStr);
+        }
+    }
+
+    public DeltaFileRecordReader(Map<String, List<SerializableFileSplit>> 
splitsMap, String serScanState,
+            ConfFactory config, String filterExpressionStr) throws 
HyracksDataException {
+        JobConf conf = config.getConf();
+        this.usingSplits = true;
+        this.engine = DeltaEngine.create(conf);
+        scanAndSplits = new ArrayList<>(splitsMap.entrySet());
+        this.scanState = RowSerDe.deserializeRowFromJson(serScanState);
+        this.physicalReadSchema = null;
+        this.physicalDataIter = null;
+        this.dataIter = null;
+        this.record = new GenericRecord<>();
+        if (scanAndSplits.size() > 0) {
+            this.fileIndex = 0;
+            this.scanFile = 
RowSerDe.deserializeRowFromJson(scanAndSplits.get(0).getKey());
+            initializeDataIterators(filterExpressionStr);
+        }
+    }
+
+    private void initializeDataIterators(String filterExpressionStr) throws 
HyracksDataException {
+        this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
+        this.physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+        this.filterPredicate = 
PredicateSerDe.deserializeExpressionFromJson(filterExpressionStr);
+        try {
+            if (usingSplits) {
+                this.physicalDataIter = ((DeltaParquetHandler) 
engine.getParquetHandler()).readParquetSplits(
+                        
toCloseableIterator(scanAndSplits.get(fileIndex).getValue().iterator()), 
physicalReadSchema,
+                        filterPredicate);
+            } else {
                 this.physicalDataIter = engine.getParquetHandler()
                         
.readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, 
filterPredicate);
-                this.dataIter = Scan.transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
-                if (dataIter.hasNext()) {
-                    rows = dataIter.next().getRows();
-                }
-            } catch (IOException e) {
-                throw new 
RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, 
getMessageOrToString(e));
             }
+            this.dataIter = Scan.transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
+            if (dataIter.hasNext()) {
+                rows = dataIter.next().getRows();
+            }
+        } catch (IOException e) {
+            throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, 
getMessageOrToString(e));
         }
     }
 
@@ -112,6 +145,9 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
         if (physicalDataIter != null) {
             physicalDataIter.close();
         }
+        if (rows != null) {
+            rows.close();
+        }
     }
 
     @Override
@@ -121,14 +157,25 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
         } else if (dataIter != null && dataIter.hasNext()) {
             rows = dataIter.next().getRows();
             return this.hasNext();
-        } else if (fileIndex < scanFiles.size() - 1) {
+        } else if ((!usingSplits && fileIndex < scanFiles.size() - 1)
+                || (usingSplits && fileIndex < scanAndSplits.size() - 1)) {
             fileIndex++;
-            scanFile = scanFiles.get(fileIndex);
+            if (usingSplits) {
+                scanFile = 
RowSerDe.deserializeRowFromJson(scanAndSplits.get(fileIndex).getKey());
+            } else {
+                scanFile = scanFiles.get(fileIndex);
+            }
             fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
             physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
             try {
-                physicalDataIter = 
engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
-                        physicalReadSchema, filterPredicate);
+                if (usingSplits) {
+                    this.physicalDataIter = ((DeltaParquetHandler) 
engine.getParquetHandler()).readParquetSplits(
+                            
toCloseableIterator(scanAndSplits.get(fileIndex).getValue().iterator()), 
physicalReadSchema,
+                            filterPredicate);
+                } else {
+                    physicalDataIter = 
engine.getParquetHandler().readParquetFiles(
+                            singletonCloseableIterator(fileStatus), 
physicalReadSchema, filterPredicate);
+                }
                 dataIter = Scan.transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetFileReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetFileReader.java
new file mode 100644
index 0000000000..a01bea8df5
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetFileReader.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static 
io.delta.kernel.defaults.internal.parquet.ParquetFilterUtils.toParquetFilter;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.setFilterPredicate;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetRecordReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.defaults.engine.fileio.FileIO;
+import io.delta.kernel.defaults.internal.parquet.ParquetFileReader;
+import io.delta.kernel.exceptions.KernelEngineException;
+import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+
+public class DeltaParquetFileReader extends ParquetFileReader {
+
+    private final FileIO fileIO;
+    private final int maxBatchSize;
+    private final Configuration conf;
+
+    public DeltaParquetFileReader(FileIO fileIO, Configuration conf) {
+        super(fileIO);
+        this.fileIO = requireNonNull(fileIO, "fileIO is null");
+        this.conf = requireNonNull(conf, "conf is null");
+        this.maxBatchSize =
+                
fileIO.getConf("delta.kernel.default.parquet.reader.batch-size").map(Integer::valueOf).orElse(1024);
+    }
+
+    public CloseableIterator<ColumnarBatch> read(InputSplit split, StructType 
schema, Optional<Predicate> predicate) {
+
+        final boolean hasRowIndexCol = 
schema.indexOf(StructField.METADATA_ROW_INDEX_COLUMN_NAME) >= 0
+                && 
schema.get(StructField.METADATA_ROW_INDEX_COLUMN_NAME).isMetadataColumn();
+
+        return new CloseableIterator<ColumnarBatch>() {
+            private final BatchReadSupport readSupport = new 
BatchReadSupport(maxBatchSize, schema);
+            private ParquetRecordReader<Object> reader;
+            private boolean hasNotConsumedNextElement;
+
+            @Override
+            public void close() throws IOException {
+                Utils.closeCloseables(reader);
+            }
+
+            @Override
+            public boolean hasNext() {
+                initParquetReaderIfRequired();
+                try {
+                    if (hasNotConsumedNextElement) {
+                        return true;
+                    }
+
+                    hasNotConsumedNextElement = reader.nextKeyValue() && 
reader.getCurrentValue() != null;
+                    return hasNotConsumedNextElement;
+                } catch (IOException | InterruptedException ex) {
+                    throw new KernelEngineException("Error reading Parquet 
file: " + ((FileSplit) split).getPath(), ex);
+                }
+            }
+
+            @Override
+            public ColumnarBatch next() {
+                if (!hasNotConsumedNextElement) {
+                    throw new NoSuchElementException();
+                }
+                int batchSize = 0;
+                do {
+                    hasNotConsumedNextElement = false;
+                    // hasNext reads to row to confirm there is a next element.
+                    // get the row index only if required by the read schema
+                    long rowIndex = 0;
+                    try {
+                        rowIndex = hasRowIndexCol ? 
reader.getCurrentRowIndex() : -1;
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                    readSupport.finalizeCurrentRow(rowIndex);
+                    batchSize++;
+                } while (batchSize < maxBatchSize && hasNext());
+
+                return readSupport.getDataAsColumnarBatch(batchSize);
+            }
+
+            private void initParquetReaderIfRequired() {
+                if (reader == null) {
+                    org.apache.parquet.hadoop.ParquetFileReader fileReader = 
null;
+                    try {
+                        Configuration confCopy = conf;
+                        Path filePath = ((FileSplit) split).getPath();
+
+                        // We need physical schema in order to construct a 
filter that can be
+                        // pushed into the `parquet-mr` reader. For that 
reason read the footer
+                        // in advance.
+                        ParquetMetadata footer =
+                                
org.apache.parquet.hadoop.ParquetFileReader.readFooter(confCopy, filePath);
+
+                        MessageType parquetSchema = 
footer.getFileMetaData().getSchema();
+                        Optional<FilterPredicate> parquetPredicate =
+                                predicate.flatMap(predicate -> 
toParquetFilter(parquetSchema, predicate));
+
+                        if (parquetPredicate.isPresent()) {
+                            // clone the configuration to avoid modifying the 
original one
+                            confCopy = new Configuration(confCopy);
+
+                            setFilterPredicate(confCopy, 
parquetPredicate.get());
+                            // Disable the record level filtering as the 
`parquet-mr` evaluates
+                            // the filter once the entire record has been 
materialized. Instead,
+                            // we use the predicate to prune the row groups 
which is more efficient.
+                            // In the future, we can consider using the record 
level filtering if a
+                            // native Parquet reader is implemented in Kernel 
default module.
+                            confCopy.set(RECORD_FILTERING_ENABLED, "false");
+                            confCopy.set(DICTIONARY_FILTERING_ENABLED, 
"false");
+                            confCopy.set(COLUMN_INDEX_FILTERING_ENABLED, 
"false");
+                        }
+
+                        // Pass the already read footer to the reader to avoid 
reading it again.
+                        fileReader = new ParquetFileReaderWithFooter(filePath, 
confCopy, footer);
+                        reader = new ParquetRecordReader<>(readSupport, 
ParquetInputFormat.getFilter(confCopy));
+                        reader.initialize((FileSplit) split, confCopy, 
Reporter.NULL);
+                    } catch (IOException e) {
+                        Utils.closeCloseablesSilently(fileReader, reader);
+                        throw new KernelEngineException("Error reading Parquet 
file: " + ((FileSplit) split).getPath(),
+                                e);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        };
+    }
+
+    /**
+     * Wrapper around {@link org.apache.parquet.hadoop.ParquetFileReader} to 
allow using the
+     * provided footer instead of reading it again. We read the footer in 
advance to construct a
+     * predicate for filtering rows.
+     */
+    private static class ParquetFileReaderWithFooter extends 
org.apache.parquet.hadoop.ParquetFileReader {
+        private final ParquetMetadata footer;
+
+        ParquetFileReaderWithFooter(Path filePath, Configuration 
configuration, ParquetMetadata footer)
+                throws IOException {
+            super(configuration, filePath, footer);
+            this.footer = requireNonNull(footer, "footer is null");
+        }
+
+        @Override
+        public ParquetMetadata getFooter() {
+            return footer; // return the footer passed in the constructor
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetHandler.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetHandler.java
new file mode 100644
index 0000000000..8c78999110
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.defaults.engine.DefaultParquetHandler;
+import io.delta.kernel.defaults.engine.fileio.FileIO;
+import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+
+public class DeltaParquetHandler extends DefaultParquetHandler {
+
+    private final FileIO fileIO;
+    private final Configuration conf;
+
+    public DeltaParquetHandler(FileIO fileIO, Configuration conf) {
+        super(fileIO);
+        this.fileIO = fileIO;
+        this.conf = conf;
+    }
+
+    public CloseableIterator<ColumnarBatch> readParquetSplits(final 
CloseableIterator<SerializableFileSplit> splits,
+            final StructType physicalSchema, final Optional<Predicate> 
predicate) throws IOException {
+        return new CloseableIterator<>() {
+            private final DeltaParquetFileReader batchReader;
+            private CloseableIterator<ColumnarBatch> currentFileReader;
+
+            {
+                this.batchReader =
+                        new 
DeltaParquetFileReader(DeltaParquetHandler.this.fileIO, 
DeltaParquetHandler.this.conf);
+            }
+
+            public void close() throws IOException {
+                Utils.closeCloseables(new AutoCloseable[] { 
this.currentFileReader, splits });
+            }
+
+            public boolean hasNext() {
+                if (this.currentFileReader != null && 
this.currentFileReader.hasNext()) {
+                    return true;
+                } else {
+                    Utils.closeCloseables(new AutoCloseable[] { 
this.currentFileReader });
+                    this.currentFileReader = null;
+                    if (splits.hasNext()) {
+                        this.currentFileReader = 
this.batchReader.read(splits.next(), physicalSchema, predicate);
+                        return this.hasNext();
+                    } else {
+                        return false;
+                    }
+                }
+            }
+
+            public ColumnarBatch next() {
+                return this.currentFileReader.next();
+            }
+        };
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 03285efbe9..9ac004386b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -25,12 +25,14 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 
 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -45,6 +47,7 @@ import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -55,6 +58,8 @@ import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
 
 import io.delta.kernel.Scan;
 import io.delta.kernel.Snapshot;
@@ -73,13 +78,14 @@ import io.delta.kernel.utils.FileStatus;
 
 public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object> {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private static final Logger LOGGER = LogManager.getLogger();
     private transient AlgebricksAbsolutePartitionConstraint 
locationConstraints;
     private String scanState;
     protected final List<PartitionWorkLoadBasedOnSize> 
partitionWorkLoadsBasedOnSize = new ArrayList<>();
     protected ConfFactory confFactory;
     private String filterExpressionStr;
+    private boolean usingSplits;
 
     public List<PartitionWorkLoadBasedOnSize> 
getPartitionWorkLoadsBasedOnSize() {
         return partitionWorkLoadsBasedOnSize;
@@ -157,12 +163,43 @@ public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object>
             scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
             scanFiles = getScanFiles(scan, engine);
         }
+        int numPartitions = getPartitionConstraint().getLocations().length;
         LOGGER.info("Number of delta table parquet data files to scan: {}", 
scanFiles.size());
         configuration.put(ExternalDataConstants.KEY_PARSER, 
ExternalDataConstants.FORMAT_DELTA);
-        distributeFiles(scanFiles, 
getPartitionConstraint().getLocations().length);
+        try {
+            usingSplits = getFileSplitsConfig(configuration, appCtx);
+            if (usingSplits) {
+                distributeSplits(scanFiles, conf, numPartitions);
+            } else {
+                distributeFiles(scanFiles, numPartitions);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
         issueWarnings(warnings, warningCollector);
     }
 
+    private boolean getFileSplitsConfig(Map<String, String> configuration, 
ICcApplicationContext appCtx) {
+        String fileSplits = 
configuration.get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY);
+        return fileSplits != null ? Boolean.parseBoolean(fileSplits)
+                : 
appCtx.getCompilerProperties().isDeltaLakeFileSplitsEnabled();
+    }
+
+    private List<SerializableFileSplit> getInputSplits(Row file, JobConf conf) 
throws IOException {
+        List<SerializableFileSplit> inputSplits = new ArrayList<>();
+        FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(file);
+        Path parquetPath = new Path(fileStatus.getPath());
+        try (ParquetFileReader reader = ParquetFileReader.open(conf, 
parquetPath)) {
+            List<BlockMetaData> blocks = reader.getFooter().getBlocks();
+            for (BlockMetaData block : blocks) {
+                long start = block.getStartingPos();
+                long length = block.getCompressedSize();
+                inputSplits.add(new SerializableFileSplit(parquetPath, start, 
length, conf));
+            }
+        }
+        return inputSplits;
+    }
+
     private List<Row> getScanFiles(Scan scan, Engine engine) {
         List<Row> scanFiles = new ArrayList<>();
         CloseableIterator<FilteredColumnarBatch> iter = 
scan.getScanFiles(engine);
@@ -195,10 +232,9 @@ public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object>
     public void distributeFiles(List<Row> scanFiles, int partitionsCount) {
         PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new 
PriorityQueue<>(partitionsCount,
                 
Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
-
         // Prepare the workloads based on the number of partitions
         for (int i = 0; i < partitionsCount; i++) {
-            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
+            workloadQueue.add(new PartitionWorkLoadBasedOnSize(false));
         }
         for (Row scanFileRow : scanFiles) {
             PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
@@ -209,12 +245,37 @@ public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object>
         partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
     }
 
+    private void distributeSplits(List<Row> scanFiles, JobConf conf, int 
partitionsCount) throws IOException {
+        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new 
PriorityQueue<>(partitionsCount,
+                
Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+        // Prepare the workloads based on the number of partitions
+        for (int i = 0; i < partitionsCount; i++) {
+            workloadQueue.add(new PartitionWorkLoadBasedOnSize(true));
+        }
+        for (Row scanFile : scanFiles) {
+            String scanFileJson = RowSerDe.serializeRowToJson(scanFile);
+            List<SerializableFileSplit> splits = getInputSplits(scanFile, 
conf);
+            // Distribute splits across partitions
+            for (int i = 0; i < splits.size(); i++) {
+                PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+                workload.addScanFileSplit(scanFileJson, splits.get(i));
+                workloadQueue.add(workload);
+            }
+        }
+        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
+    }
+
     @Override
     public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext 
context) throws HyracksDataException {
         try {
             int partition = context.getPartition();
-            return new 
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(),
 scanState,
-                    confFactory, filterExpressionStr);
+            if (usingSplits) {
+                return new 
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFileSplits(),
+                        scanState, confFactory, filterExpressionStr);
+            } else {
+                return new 
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(),
 scanState,
+                        confFactory, filterExpressionStr);
+            }
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -231,29 +292,45 @@ public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object>
     }
 
     public static class PartitionWorkLoadBasedOnSize implements Serializable {
-        private static final long serialVersionUID = 1L;
+        private static final long serialVersionUID = 2L;
         private final List<String> scanFiles = new ArrayList<>();
+        private final Map<String, List<SerializableFileSplit>> scanFileSplits 
= new HashMap<>();
+        private final boolean usingSplits;
         private long totalSize = 0;
 
-        public PartitionWorkLoadBasedOnSize() {
+        public PartitionWorkLoadBasedOnSize(boolean usingSplits) {
+            this.usingSplits = usingSplits;
         }
 
         public List<String> getScanFiles() {
             return scanFiles;
         }
 
+        public Map<String, List<SerializableFileSplit>> getScanFileSplits() {
+            return scanFileSplits;
+        }
+
+        public boolean isUsingSplits() {
+            return usingSplits;
+        }
+
         public void addScanFile(String scanFile, long size) {
             this.scanFiles.add(scanFile);
             this.totalSize += size;
         }
 
+        public void addScanFileSplit(String scanFile, SerializableFileSplit 
split) {
+            this.totalSize += split.getLength();
+            this.scanFileSplits.computeIfAbsent(scanFile, k -> new 
ArrayList<>()).add(split);
+        }
+
         public long getTotalSize() {
             return totalSize;
         }
 
         @Override
         public String toString() {
-            return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
+            return "Files: " + (usingSplits ? scanFileSplits.size() : 
scanFiles.size()) + ", Total Size: " + totalSize;
         }
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/SerializableFileSplit.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/SerializableFileSplit.java
new file mode 100644
index 0000000000..445cb15e4e
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/SerializableFileSplit.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+public class SerializableFileSplit extends FileSplit implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public SerializableFileSplit(Path file, long start, long length, JobConf 
conf) {
+        super(file, start, length, conf);
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        this.write(out);
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        this.readFields(in);
+    }
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index b961421db2..f7bf0f6e67 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.common.metadata.DataverseName;
@@ -139,6 +140,7 @@ public class DatasetDataSource extends DataSource {
                         addExternalProjectionInfo(projectionFiltrationInfo, 
edd.getProperties());
                 properties = addSubPath(externalDataSource.getProperties(), 
properties);
                 properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE, 
String.valueOf(externalScanBufferSize));
+                setExternalCollectionCompilerProperties(metadataProvider, 
properties);
                 IExternalFilterEvaluatorFactory filterEvaluatorFactory = 
metadataProvider
                         .createExternalFilterEvaluatorFactory(context, 
typeEnv, projectionFiltrationInfo, properties);
                 ITypedAdapterFactory adapterFactory =
@@ -223,4 +225,13 @@ public class DatasetDataSource extends DataSource {
     public boolean isScanAccessPathALeaf() {
         return dataset.getDatasetType() == DatasetType.EXTERNAL;
     }
+
+    private void setExternalCollectionCompilerProperties(MetadataProvider 
metadataProvider,
+            Map<String, String> configuration) {
+        String fileSplits =
+                (String) 
metadataProvider.getConfig().get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY);
+        if (fileSplits != null) {
+            
configuration.put(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY, 
fileSplits);
+        }
+    }
 }

Reply via email to