This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit bfe4b3a7a2c4091ec9a2daa1de724a9b8d547395
Author: Peeyush Gupta <[email protected]>
AuthorDate: Tue Jul 22 12:19:12 2025 -0700

    [NO ISSUE][EXT] Upgrage Delta Kernel API to v4.0.0
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Ext-ref: MB-67760
    Change-Id: I283206f278ec713a517b7f02016bcb5b321e65fb
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20131
    Reviewed-by: Michael Blow <[email protected]>
    Tested-by: Michael Blow <[email protected]>
---
 asterixdb/asterix-app/pom.xml                      |   5 +
 .../deltalake/DeltaTableGenerator.java             |  38 ++-
 asterixdb/asterix-external-data/pom.xml            |  13 +-
 .../reader/aws/delta/DefaultExpressionUtils.java   | 366 ---------------------
 .../input/record/reader/aws/delta/DeltaEngine.java |  40 ---
 .../reader/aws/delta/DeltaExpressionEvaluator.java | 339 -------------------
 .../reader/aws/delta/DeltaExpressionHandler.java   |  40 ---
 .../reader/aws/delta/DeltaFileRecordReader.java    | 215 +-----------
 .../reader/aws/delta/DeltaPredicateEvaluator.java  |  66 ----
 .../reader/aws/delta/DeltaReaderFactory.java       |  15 +-
 .../reader/aws/delta/ElementAtEvaluator.java       | 151 ---------
 .../record/reader/aws/delta/ExpressionVisitor.java | 136 --------
 .../reader/aws/delta/ImplicitCastExpression.java   | 267 ---------------
 .../reader/aws/delta/PartitionValueEvaluator.java  | 136 --------
 hyracks-fullstack/pom.xml                          |  10 +
 15 files changed, 59 insertions(+), 1778 deletions(-)

diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index b04c95716f..bd6b9feed8 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -1128,5 +1128,10 @@
       <artifactId>converter</artifactId>
       <version>0.2.15</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-runtime</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index a3b20f8616..a8f0b7b674 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -32,6 +32,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.parquet.avro.AvroParquetWriter;
@@ -133,8 +134,9 @@ public class DeltaTableGenerator {
                 writer.write(record);
             }
 
-            long size = writer.getDataSize();
+            FileSystem fs = FileSystem.getLocal(conf);
             writer.close();
+            long size = fs.getFileStatus(path).getLen();
 
             List<Action> actions = List.of(new AddFile("firstFile.parquet", 
new HashMap<>(), size,
                     System.currentTimeMillis(), true, null, null));
@@ -154,8 +156,8 @@ public class DeltaTableGenerator {
             for (GenericData.Record record : fileSecondSnapshotRecords) {
                 writer2.write(record);
             }
-            long size2 = writer2.getDataSize();
             writer2.close();
+            long size2 = fs.getFileStatus(path2).getLen();
             AddFile addFile = new AddFile("firstFile.parquet", new 
HashMap<>(), size, System.currentTimeMillis(), true,
                     null, null);
             RemoveFile removeFile = addFile.remove();
@@ -201,8 +203,9 @@ public class DeltaTableGenerator {
                 writer.write(record);
             }
 
-            long size = writer.getDataSize();
+            FileSystem fs = FileSystem.getLocal(conf);
             writer.close();
+            long size = fs.getFileStatus(path).getLen();
 
             List<Action> actions = List.of(new AddFile("firstFile.parquet", 
new HashMap<>(), size,
                     System.currentTimeMillis(), true, null, null));
@@ -224,8 +227,8 @@ public class DeltaTableGenerator {
                 writer2.write(record);
             }
 
-            long size2 = writer2.getDataSize();
             writer2.close();
+            long size2 = fs.getFileStatus(path2).getLen();
 
             List<Action> actions2 = List.of(new AddFile("secondFile.parquet", 
new HashMap<>(), size2,
                     System.currentTimeMillis(), true, null, null));
@@ -252,9 +255,9 @@ public class DeltaTableGenerator {
             for (GenericData.Record record : fileFirstSnapshotRecords) {
                 writer.write(record);
             }
-
-            long size = writer.getDataSize();
+            FileSystem fs = FileSystem.getLocal(conf);
             writer.close();
+            long size = fs.getFileStatus(path).getLen();
 
             List<Action> actions = List.of(new AddFile("firstFile.parquet", 
new HashMap<>(), size,
                     System.currentTimeMillis(), true, null, null));
@@ -325,8 +328,9 @@ public class DeltaTableGenerator {
                 writer.write(record);
             }
 
-            long size = writer.getDataSize();
+            FileSystem fs = FileSystem.getLocal(conf);
             writer.close();
+            long size = fs.getFileStatus(path).getLen();
 
             List<Action> actions = List.of(new AddFile("firstFile.parquet", 
new HashMap<>(), size,
                     System.currentTimeMillis(), true, null, null));
@@ -348,8 +352,8 @@ public class DeltaTableGenerator {
                     writer2.write(record);
                 }
 
-                long size2 = writer2.getDataSize();
                 writer2.close();
+                long size2 = fs.getFileStatus(path2).getLen();
 
                 List<Action> actions2 = List.of(new AddFile("File" + i + 
".parquet", new HashMap<>(), size2,
                         System.currentTimeMillis(), true, null, null));
@@ -426,8 +430,9 @@ public class DeltaTableGenerator {
             for (GenericData.Record record : fileFirstSnapshotRecords) {
                 writer.write(record);
             }
-            long size = writer.getDataSize();
+            FileSystem fs = FileSystem.getLocal(conf);
             writer.close();
+            long size = fs.getFileStatus(path).getLen();
 
             Path path2 = new Path(DELTA_PARTITIONED_TABLE, 
"secondFile.parquet");
             ParquetWriter<GenericData.Record> writer2 =
@@ -435,8 +440,8 @@ public class DeltaTableGenerator {
             for (GenericData.Record record : fileSecondSnapshotRecords) {
                 writer2.write(record);
             }
-            long size2 = writer2.getDataSize();
             writer2.close();
+            long size2 = fs.getFileStatus(path2).getLen();
 
             Path path3 = new Path(DELTA_PARTITIONED_TABLE, 
"thirdFile.parquet");
             ParquetWriter<GenericData.Record> writer3 =
@@ -444,8 +449,8 @@ public class DeltaTableGenerator {
             for (GenericData.Record record : fileThirdSnapshotRecords) {
                 writer3.write(record);
             }
-            long size3 = writer3.getDataSize();
             writer3.close();
+            long size3 = fs.getFileStatus(path3).getLen();
 
             Path path4 = new Path(DELTA_PARTITIONED_TABLE, 
"fourthFile.parquet");
             ParquetWriter<GenericData.Record> writer4 =
@@ -453,8 +458,8 @@ public class DeltaTableGenerator {
             for (GenericData.Record record : fileFourthSnapshotRecords) {
                 writer4.write(record);
             }
-            long size4 = writer4.getDataSize();
             writer4.close();
+            long size4 = fs.getFileStatus(path4).getLen();
 
             DeltaLog log = DeltaLog.forTable(conf, DELTA_PARTITIONED_TABLE);
             OptimisticTransaction txn = log.startTransaction();
@@ -557,8 +562,9 @@ public class DeltaTableGenerator {
             for (GenericData.Record record : fileFirstSnapshotRecords) {
                 writer.write(record);
             }
-            long size = writer.getDataSize();
+            FileSystem fs = FileSystem.getLocal(conf);
             writer.close();
+            long size = fs.getFileStatus(path).getLen();
 
             Path path2 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, 
"secondFile.parquet");
             ParquetWriter<GenericData.Record> writer2 =
@@ -566,8 +572,8 @@ public class DeltaTableGenerator {
             for (GenericData.Record record : fileSecondSnapshotRecords) {
                 writer2.write(record);
             }
-            long size2 = writer2.getDataSize();
             writer2.close();
+            long size2 = fs.getFileStatus(path2).getLen();
 
             Path path3 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, 
"thirdFile.parquet");
             ParquetWriter<GenericData.Record> writer3 =
@@ -575,8 +581,8 @@ public class DeltaTableGenerator {
             for (GenericData.Record record : fileThirdSnapshotRecords) {
                 writer3.write(record);
             }
-            long size3 = writer3.getDataSize();
             writer3.close();
+            long size3 = fs.getFileStatus(path3).getLen();
 
             Path path4 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, 
"fourthFile.parquet");
             ParquetWriter<GenericData.Record> writer4 =
@@ -584,8 +590,8 @@ public class DeltaTableGenerator {
             for (GenericData.Record record : fileFourthSnapshotRecords) {
                 writer4.write(record);
             }
-            long size4 = writer4.getDataSize();
             writer4.close();
+            long size4 = fs.getFileStatus(path4).getLen();
 
             DeltaLog log = DeltaLog.forTable(conf, 
DELTA_TIMESTAMP_PARTITIONED_TABLE);
             OptimisticTransaction txn = log.startTransaction();
diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index b189177389..ae894d5f55 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -579,12 +579,12 @@
     <dependency>
       <groupId>io.delta</groupId>
       <artifactId>delta-kernel-api</artifactId>
-      <version>3.2.1</version>
+      <version>4.0.0</version>
     </dependency>
     <dependency>
       <groupId>io.delta</groupId>
       <artifactId>delta-kernel-defaults</artifactId>
-      <version>3.2.1</version>
+      <version>4.0.0</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
@@ -596,6 +596,15 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.datatype</groupId>
+      <artifactId>jackson-datatype-jdk8</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-runtime</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>io.delta</groupId>
       <artifactId>delta-standalone_2.12</artifactId>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java
deleted file mode 100644
index 560360c840..0000000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.aws.delta;
-
-import static io.delta.kernel.internal.util.Preconditions.checkArgument;
-
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.util.Comparator;
-import java.util.List;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import io.delta.kernel.data.ArrayValue;
-import io.delta.kernel.data.ColumnVector;
-import io.delta.kernel.data.MapValue;
-import io.delta.kernel.expressions.Expression;
-import io.delta.kernel.internal.util.Utils;
-import io.delta.kernel.types.BinaryType;
-import io.delta.kernel.types.BooleanType;
-import io.delta.kernel.types.ByteType;
-import io.delta.kernel.types.DataType;
-import io.delta.kernel.types.DateType;
-import io.delta.kernel.types.DecimalType;
-import io.delta.kernel.types.DoubleType;
-import io.delta.kernel.types.FloatType;
-import io.delta.kernel.types.IntegerType;
-import io.delta.kernel.types.LongType;
-import io.delta.kernel.types.ShortType;
-import io.delta.kernel.types.StringType;
-import io.delta.kernel.types.TimestampNTZType;
-import io.delta.kernel.types.TimestampType;
-
-/**
- * Utility methods used by the default expression evaluator.
- */
-class DefaultExpressionUtils {
-    private DefaultExpressionUtils() {
-    }
-
-    static final Comparator<String> STRING_COMPARATOR = (leftOp, rightOp) -> {
-        byte[] leftBytes = leftOp.getBytes(StandardCharsets.UTF_8);
-        byte[] rightBytes = rightOp.getBytes(StandardCharsets.UTF_8);
-        int i = 0;
-        while (i < leftBytes.length && i < rightBytes.length) {
-            if (leftBytes[i] != rightBytes[i]) {
-                return Byte.toUnsignedInt(leftBytes[i]) - 
Byte.toUnsignedInt(rightBytes[i]);
-            }
-            i++;
-        }
-        return Integer.compare(leftBytes.length, rightBytes.length);
-    };
-
-    static final Comparator<byte[]> BINARY_COMPARTOR = (leftOp, rightOp) -> {
-        int i = 0;
-        while (i < leftOp.length && i < rightOp.length) {
-            if (leftOp[i] != rightOp[i]) {
-                return Byte.toUnsignedInt(leftOp[i]) - 
Byte.toUnsignedInt(rightOp[i]);
-            }
-            i++;
-        }
-        return Integer.compare(leftOp.length, rightOp.length);
-    };
-
-    /**
-     * Utility method that calculates the nullability result from given two 
vectors. Result is
-     * null if at least one side is a null.
-     */
-    static boolean[] evalNullability(ColumnVector left, ColumnVector right) {
-        int numRows = left.getSize();
-        boolean[] nullability = new boolean[numRows];
-        for (int rowId = 0; rowId < numRows; rowId++) {
-            nullability[rowId] = left.isNullAt(rowId) || right.isNullAt(rowId);
-        }
-        return nullability;
-    }
-
-    /**
-     * Wraps a child vector as a boolean {@link ColumnVector} with the given 
value and nullability
-     * accessors.
-     */
-    static ColumnVector booleanWrapperVector(ColumnVector childVector, 
Function<Integer, Boolean> valueAccessor,
-            Function<Integer, Boolean> nullabilityAccessor) {
-
-        return new ColumnVector() {
-
-            @Override
-            public DataType getDataType() {
-                return BooleanType.BOOLEAN;
-            }
-
-            @Override
-            public int getSize() {
-                return childVector.getSize();
-            }
-
-            @Override
-            public void close() {
-                childVector.close();
-            }
-
-            @Override
-            public boolean isNullAt(int rowId) {
-                return nullabilityAccessor.apply(rowId);
-            }
-
-            @Override
-            public boolean getBoolean(int rowId) {
-                return valueAccessor.apply(rowId);
-            }
-        };
-    }
-
-    /**
-     * Utility method to compare the left and right according to the natural 
ordering
-     * and return an integer array where each row contains the comparison 
result (-1, 0, 1) for
-     * corresponding rows in the input vectors compared.
-     * <p>
-     * Only primitive data types are supported.
-     */
-    static int[] compare(ColumnVector left, ColumnVector right) {
-        checkArgument(left.getSize() == right.getSize(), "Left and right 
operand have different vector sizes.");
-        DataType dataType = left.getDataType();
-
-        int numRows = left.getSize();
-        int[] result = new int[numRows];
-        if (dataType instanceof BooleanType) {
-            compareBoolean(left, right, result);
-        } else if (dataType instanceof ByteType) {
-            compareByte(left, right, result);
-        } else if (dataType instanceof ShortType) {
-            compareShort(left, right, result);
-        } else if (dataType instanceof IntegerType || dataType instanceof 
DateType) {
-            compareInt(left, right, result);
-        } else if (dataType instanceof LongType || dataType instanceof 
TimestampType
-                || dataType instanceof TimestampNTZType) {
-            compareLong(left, right, result);
-        } else if (dataType instanceof FloatType) {
-            compareFloat(left, right, result);
-        } else if (dataType instanceof DoubleType) {
-            compareDouble(left, right, result);
-        } else if (dataType instanceof DecimalType) {
-            compareDecimal(left, right, result);
-        } else if (dataType instanceof StringType) {
-            compareString(left, right, result);
-        } else if (dataType instanceof BinaryType) {
-            compareBinary(left, right, result);
-        } else {
-            throw new UnsupportedOperationException(dataType + " can not be 
compared.");
-        }
-        return result;
-    }
-
-    static void compareBoolean(ColumnVector left, ColumnVector right, int[] 
result) {
-        for (int rowId = 0; rowId < left.getSize(); rowId++) {
-            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
-                result[rowId] = Boolean.compare(left.getBoolean(rowId), 
right.getBoolean(rowId));
-            }
-        }
-    }
-
-    static void compareByte(ColumnVector left, ColumnVector right, int[] 
result) {
-        for (int rowId = 0; rowId < left.getSize(); rowId++) {
-            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
-                result[rowId] = Byte.compare(left.getByte(rowId), 
right.getByte(rowId));
-            }
-        }
-    }
-
-    static void compareShort(ColumnVector left, ColumnVector right, int[] 
result) {
-        for (int rowId = 0; rowId < left.getSize(); rowId++) {
-            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
-                result[rowId] = Short.compare(left.getShort(rowId), 
right.getShort(rowId));
-            }
-        }
-    }
-
-    static void compareInt(ColumnVector left, ColumnVector right, int[] 
result) {
-        for (int rowId = 0; rowId < left.getSize(); rowId++) {
-            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
-                result[rowId] = Integer.compare(left.getInt(rowId), 
right.getInt(rowId));
-            }
-        }
-    }
-
-    static void compareLong(ColumnVector left, ColumnVector right, int[] 
result) {
-        for (int rowId = 0; rowId < left.getSize(); rowId++) {
-            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
-                result[rowId] = Long.compare(left.getLong(rowId), 
right.getLong(rowId));
-            }
-        }
-    }
-
-    static void compareFloat(ColumnVector left, ColumnVector right, int[] 
result) {
-        for (int rowId = 0; rowId < left.getSize(); rowId++) {
-            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
-                result[rowId] = Float.compare(left.getFloat(rowId), 
right.getFloat(rowId));
-            }
-        }
-    }
-
-    static void compareDouble(ColumnVector left, ColumnVector right, int[] 
result) {
-        for (int rowId = 0; rowId < left.getSize(); rowId++) {
-            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
-                result[rowId] = Double.compare(left.getDouble(rowId), 
right.getDouble(rowId));
-            }
-        }
-    }
-
-    static void compareString(ColumnVector left, ColumnVector right, int[] 
result) {
-        for (int rowId = 0; rowId < left.getSize(); rowId++) {
-            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
-                result[rowId] = 
STRING_COMPARATOR.compare(left.getString(rowId), right.getString(rowId));
-            }
-        }
-    }
-
-    static void compareDecimal(ColumnVector left, ColumnVector right, int[] 
result) {
-        Comparator<BigDecimal> comparator = Comparator.naturalOrder();
-        for (int rowId = 0; rowId < left.getSize(); rowId++) {
-            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
-                result[rowId] = comparator.compare(left.getDecimal(rowId), 
right.getDecimal(rowId));
-            }
-        }
-    }
-
-    static void compareBinary(ColumnVector left, ColumnVector right, int[] 
result) {
-        for (int rowId = 0; rowId < left.getSize(); rowId++) {
-            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
-                result[rowId] = 
BINARY_COMPARTOR.compare(left.getBinary(rowId), right.getBinary(rowId));
-            }
-        }
-    }
-
-    static Expression childAt(Expression expression, int index) {
-        return expression.getChildren().get(index);
-    }
-
-    /**
-     * Combines a list of column vectors into one column vector based on the 
resolution of
-     * idxToReturn
-     * @param vectors List of ColumnVectors of the same data type with length 
>= 1
-     * @param idxToReturn Function that takes in a rowId and returns the index 
of the column vector
-     *                    to use as the return value
-     */
-    static ColumnVector combinationVector(List<ColumnVector> vectors, 
Function<Integer, Integer> idxToReturn) {
-        return new ColumnVector() {
-            // Store the last lookup value to avoid multiple looks up for same 
rowId.
-            // The general pattern is call `isNullAt(rowId)` followed by 
`getBoolean(rowId)` or
-            // some other value accessor. So the cache of one value is enough.
-            private int lastLookupRowId = -1;
-            private ColumnVector lastLookupVector = null;
-
-            @Override
-            public DataType getDataType() {
-                return vectors.get(0).getDataType();
-            }
-
-            @Override
-            public int getSize() {
-                return vectors.get(0).getSize();
-            }
-
-            @Override
-            public void close() {
-                Utils.closeCloseables(vectors.toArray(new ColumnVector[0]));
-            }
-
-            @Override
-            public boolean isNullAt(int rowId) {
-                return getVector(rowId).isNullAt(rowId);
-            }
-
-            @Override
-            public boolean getBoolean(int rowId) {
-                return getVector(rowId).getBoolean(rowId);
-            }
-
-            @Override
-            public byte getByte(int rowId) {
-                return getVector(rowId).getByte(rowId);
-            }
-
-            @Override
-            public short getShort(int rowId) {
-                return getVector(rowId).getShort(rowId);
-            }
-
-            @Override
-            public int getInt(int rowId) {
-                return getVector(rowId).getInt(rowId);
-            }
-
-            @Override
-            public long getLong(int rowId) {
-                return getVector(rowId).getLong(rowId);
-            }
-
-            @Override
-            public float getFloat(int rowId) {
-                return getVector(rowId).getFloat(rowId);
-            }
-
-            @Override
-            public double getDouble(int rowId) {
-                return getVector(rowId).getDouble(rowId);
-            }
-
-            @Override
-            public byte[] getBinary(int rowId) {
-                return getVector(rowId).getBinary(rowId);
-            }
-
-            @Override
-            public String getString(int rowId) {
-                return getVector(rowId).getString(rowId);
-            }
-
-            @Override
-            public BigDecimal getDecimal(int rowId) {
-                return getVector(rowId).getDecimal(rowId);
-            }
-
-            @Override
-            public MapValue getMap(int rowId) {
-                return getVector(rowId).getMap(rowId);
-            }
-
-            @Override
-            public ArrayValue getArray(int rowId) {
-                return getVector(rowId).getArray(rowId);
-            }
-
-            @Override
-            public ColumnVector getChild(int ordinal) {
-                return combinationVector(vectors.stream().map(v -> 
v.getChild(ordinal)).collect(Collectors.toList()),
-                        idxToReturn);
-            }
-
-            private ColumnVector getVector(int rowId) {
-                if (rowId == lastLookupRowId) {
-                    return lastLookupVector;
-                }
-                lastLookupRowId = rowId;
-                lastLookupVector = vectors.get(idxToReturn.apply(rowId));
-                return lastLookupVector;
-            }
-        };
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java
deleted file mode 100644
index 1b4e12e62b..0000000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.aws.delta;
-
-import org.apache.hadoop.conf.Configuration;
-
-import io.delta.kernel.defaults.engine.DefaultEngine;
-import io.delta.kernel.engine.ExpressionHandler;
-
-public class DeltaEngine extends DefaultEngine {
-
-    protected DeltaEngine(Configuration configuration) {
-        super(configuration);
-    }
-
-    @Override
-    public ExpressionHandler getExpressionHandler() {
-        return new DeltaExpressionHandler();
-    }
-
-    public static DeltaEngine create(Configuration configuration) {
-        return new DeltaEngine(configuration);
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java
deleted file mode 100644
index b3082b5e8a..0000000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.aws.delta;
-
-import static io.delta.kernel.internal.util.ExpressionUtils.getLeft;
-import static io.delta.kernel.internal.util.ExpressionUtils.getRight;
-import static io.delta.kernel.internal.util.ExpressionUtils.getUnaryChild;
-import static io.delta.kernel.internal.util.Preconditions.checkArgument;
-import static 
org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.booleanWrapperVector;
-import static 
org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.childAt;
-import static 
org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.compare;
-import static 
org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.evalNullability;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import io.delta.kernel.data.ColumnVector;
-import io.delta.kernel.data.ColumnarBatch;
-import io.delta.kernel.defaults.internal.data.vector.DefaultBooleanVector;
-import io.delta.kernel.defaults.internal.data.vector.DefaultConstantVector;
-import 
io.delta.kernel.defaults.internal.expressions.DefaultExpressionEvaluator;
-import io.delta.kernel.expressions.AlwaysFalse;
-import io.delta.kernel.expressions.AlwaysTrue;
-import io.delta.kernel.expressions.And;
-import io.delta.kernel.expressions.Column;
-import io.delta.kernel.expressions.Expression;
-import io.delta.kernel.expressions.Literal;
-import io.delta.kernel.expressions.Or;
-import io.delta.kernel.expressions.PartitionValueExpression;
-import io.delta.kernel.expressions.Predicate;
-import io.delta.kernel.expressions.ScalarExpression;
-import io.delta.kernel.types.BinaryType;
-import io.delta.kernel.types.BooleanType;
-import io.delta.kernel.types.ByteType;
-import io.delta.kernel.types.DataType;
-import io.delta.kernel.types.DateType;
-import io.delta.kernel.types.DecimalType;
-import io.delta.kernel.types.DoubleType;
-import io.delta.kernel.types.FloatType;
-import io.delta.kernel.types.IntegerType;
-import io.delta.kernel.types.LongType;
-import io.delta.kernel.types.ShortType;
-import io.delta.kernel.types.StringType;
-import io.delta.kernel.types.StructType;
-import io.delta.kernel.types.TimestampNTZType;
-import io.delta.kernel.types.TimestampType;
-
-public class DeltaExpressionEvaluator extends DefaultExpressionEvaluator {
-
-    private final Expression expression;
-
-    public DeltaExpressionEvaluator(StructType structType, Expression 
expression, DataType dataType) {
-        super(structType, expression, dataType);
-        this.expression = expression;
-    }
-
-    @Override
-    public ColumnVector eval(ColumnarBatch input) {
-        return new ExpressionEvalVisitor(input).visit(expression);
-    }
-
-    /**
-     * Implementation of {@link ExpressionVisitor} to evaluate expression on a
-     * {@link ColumnarBatch}.
-     */
-    private static class ExpressionEvalVisitor extends 
ExpressionVisitor<ColumnVector> {
-        private final ColumnarBatch input;
-
-        ExpressionEvalVisitor(ColumnarBatch input) {
-            this.input = input;
-        }
-
-        /*
-        | Operand 1 | Operand 2 | `AND`      | `OR`       |
-        |-----------|-----------|------------|------------|
-        | True      | True      | True       | True       |
-        | True      | False     | False      | True       |
-        | True      | NULL      | NULL       | True       |
-        | False     | True      | False      | True       |
-        | False     | False     | False      | False      |
-        | False     | NULL      | False      | NULL       |
-        | NULL      | True      | NULL       | True       |
-        | NULL      | False     | False      | NULL       |
-        | NULL      | NULL      | NULL       | NULL       |
-         */
-        @Override
-        ColumnVector visitAnd(And and) {
-            PredicateChildrenEvalResult argResults = 
evalBinaryExpressionChildren(and);
-            ColumnVector left = argResults.leftResult;
-            ColumnVector right = argResults.rightResult;
-            int numRows = argResults.rowCount;
-            boolean[] result = new boolean[numRows];
-            boolean[] nullability = new boolean[numRows];
-            for (int rowId = 0; rowId < numRows; rowId++) {
-                boolean leftIsTrue = !left.isNullAt(rowId) && 
left.getBoolean(rowId);
-                boolean rightIsTrue = !right.isNullAt(rowId) && 
right.getBoolean(rowId);
-                boolean leftIsFalse = !left.isNullAt(rowId) && 
!left.getBoolean(rowId);
-                boolean rightIsFalse = !right.isNullAt(rowId) && 
!right.getBoolean(rowId);
-
-                if (leftIsFalse || rightIsFalse) {
-                    nullability[rowId] = false;
-                    result[rowId] = false;
-                } else if (leftIsTrue && rightIsTrue) {
-                    nullability[rowId] = false;
-                    result[rowId] = true;
-                } else {
-                    nullability[rowId] = true;
-                    // result[rowId] is undefined when nullability[rowId] = 
true
-                }
-            }
-            return new DefaultBooleanVector(numRows, Optional.of(nullability), 
result);
-        }
-
-        @Override
-        ColumnVector visitOr(Or or) {
-            PredicateChildrenEvalResult argResults = 
evalBinaryExpressionChildren(or);
-            ColumnVector left = argResults.leftResult;
-            ColumnVector right = argResults.rightResult;
-            int numRows = argResults.rowCount;
-            boolean[] result = new boolean[numRows];
-            boolean[] nullability = new boolean[numRows];
-            for (int rowId = 0; rowId < numRows; rowId++) {
-                boolean leftIsTrue = !left.isNullAt(rowId) && 
left.getBoolean(rowId);
-                boolean rightIsTrue = !right.isNullAt(rowId) && 
right.getBoolean(rowId);
-                boolean leftIsFalse = !left.isNullAt(rowId) && 
!left.getBoolean(rowId);
-                boolean rightIsFalse = !right.isNullAt(rowId) && 
!right.getBoolean(rowId);
-
-                if (leftIsTrue || rightIsTrue) {
-                    nullability[rowId] = false;
-                    result[rowId] = true;
-                } else if (leftIsFalse && rightIsFalse) {
-                    nullability[rowId] = false;
-                    result[rowId] = false;
-                } else {
-                    nullability[rowId] = true;
-                    // result[rowId] is undefined when nullability[rowId] = 
true
-                }
-            }
-            return new DefaultBooleanVector(numRows, Optional.of(nullability), 
result);
-        }
-
-        @Override
-        ColumnVector visitAlwaysTrue(AlwaysTrue alwaysTrue) {
-            return new DefaultConstantVector(BooleanType.BOOLEAN, 
input.getSize(), true);
-        }
-
-        @Override
-        ColumnVector visitAlwaysFalse(AlwaysFalse alwaysFalse) {
-            return new DefaultConstantVector(BooleanType.BOOLEAN, 
input.getSize(), false);
-        }
-
-        @Override
-        ColumnVector visitComparator(Predicate predicate) {
-            PredicateChildrenEvalResult argResults = 
evalBinaryExpressionChildren(predicate);
-
-            int numRows = argResults.rowCount;
-            boolean[] result = new boolean[numRows];
-            boolean[] nullability = evalNullability(argResults.leftResult, 
argResults.rightResult);
-            int[] compareResult = compare(argResults.leftResult, 
argResults.rightResult);
-            switch (predicate.getName()) {
-                case "=":
-                    for (int rowId = 0; rowId < numRows; rowId++) {
-                        result[rowId] = compareResult[rowId] == 0;
-                    }
-                    break;
-                case ">":
-                    for (int rowId = 0; rowId < numRows; rowId++) {
-                        result[rowId] = compareResult[rowId] > 0;
-                    }
-                    break;
-                case ">=":
-                    for (int rowId = 0; rowId < numRows; rowId++) {
-                        result[rowId] = compareResult[rowId] >= 0;
-                    }
-                    break;
-                case "<":
-                    for (int rowId = 0; rowId < numRows; rowId++) {
-                        result[rowId] = compareResult[rowId] < 0;
-                    }
-                    break;
-                case "<=":
-                    for (int rowId = 0; rowId < numRows; rowId++) {
-                        result[rowId] = compareResult[rowId] <= 0;
-                    }
-                    break;
-                default:
-                    // We should never reach this based on the 
ExpressionVisitor
-                    throw new IllegalStateException(
-                            String.format("%s is not a recognized comparator", 
predicate.getName()));
-            }
-
-            return new DefaultBooleanVector(numRows, Optional.of(nullability), 
result);
-        }
-
-        @Override
-        ColumnVector visitLiteral(Literal literal) {
-            DataType dataType = literal.getDataType();
-            if (dataType instanceof BooleanType || dataType instanceof 
ByteType || dataType instanceof ShortType
-                    || dataType instanceof IntegerType || dataType instanceof 
LongType || dataType instanceof FloatType
-                    || dataType instanceof DoubleType || dataType instanceof 
StringType
-                    || dataType instanceof BinaryType || dataType instanceof 
DecimalType || dataType instanceof DateType
-                    || dataType instanceof TimestampType || dataType 
instanceof TimestampNTZType) {
-                return new DefaultConstantVector(dataType, input.getSize(), 
literal.getValue());
-            }
-
-            throw new UnsupportedOperationException("unsupported expression 
encountered: " + literal);
-        }
-
-        @Override
-        ColumnVector visitColumn(Column column) {
-            String[] names = column.getNames();
-            DataType currentType = input.getSchema();
-            ColumnVector columnVector = null;
-            for (int level = 0; level < names.length; level++) {
-                assertColumnExists(currentType instanceof StructType, 
input.getSchema(), column);
-                StructType structSchema = ((StructType) currentType);
-                int ordinal = structSchema.indexOf(names[level]);
-                assertColumnExists(ordinal != -1, input.getSchema(), column);
-                currentType = structSchema.at(ordinal).getDataType();
-
-                if (level == 0) {
-                    columnVector = input.getColumnVector(ordinal);
-                } else {
-                    columnVector = columnVector.getChild(ordinal);
-                }
-            }
-            assertColumnExists(columnVector != null, input.getSchema(), 
column);
-            return columnVector;
-        }
-
-        @Override
-        ColumnVector visitCast(ImplicitCastExpression cast) {
-            ColumnVector inputResult = visit(cast.getInput());
-            return cast.eval(inputResult);
-        }
-
-        @Override
-        ColumnVector visitPartitionValue(PartitionValueExpression 
partitionValue) {
-            ColumnVector input = visit(partitionValue.getInput());
-            return PartitionValueEvaluator.eval(input, 
partitionValue.getDataType());
-        }
-
-        @Override
-        ColumnVector visitElementAt(ScalarExpression elementAt) {
-            ColumnVector map = visit(childAt(elementAt, 0));
-            ColumnVector lookupKey = visit(childAt(elementAt, 1));
-            return ElementAtEvaluator.eval(map, lookupKey);
-        }
-
-        @Override
-        ColumnVector visitNot(Predicate predicate) {
-            ColumnVector childResult = visit(childAt(predicate, 0));
-            return booleanWrapperVector(childResult, rowId -> 
!childResult.getBoolean(rowId),
-                    rowId -> childResult.isNullAt(rowId));
-        }
-
-        @Override
-        ColumnVector visitIsNotNull(Predicate predicate) {
-            ColumnVector childResult = visit(childAt(predicate, 0));
-            return booleanWrapperVector(childResult, rowId -> 
!childResult.isNullAt(rowId), rowId -> false);
-        }
-
-        @Override
-        ColumnVector visitIsNull(Predicate predicate) {
-            ColumnVector childResult = visit(getUnaryChild(predicate));
-            return booleanWrapperVector(childResult, rowId -> 
childResult.isNullAt(rowId), rowId -> false);
-        }
-
-        @Override
-        ColumnVector visitCoalesce(ScalarExpression coalesce) {
-            List<ColumnVector> childResults =
-                    
coalesce.getChildren().stream().map(this::visit).collect(Collectors.toList());
-            return DefaultExpressionUtils.combinationVector(childResults, 
rowId -> {
-                for (int idx = 0; idx < childResults.size(); idx++) {
-                    if (!childResults.get(idx).isNullAt(rowId)) {
-                        return idx;
-                    }
-                }
-                return 0; // If all are null then any idx suffices
-            });
-        }
-
-        /**
-         * Utility method to evaluate inputs to the binary input expression. 
Also validates the
-         * evaluated expression result {@link ColumnVector}s are of the same 
size.
-         *
-         * @param predicate
-         * @return Triplet of (result vector size, left operand result, left 
operand result)
-         */
-        private PredicateChildrenEvalResult 
evalBinaryExpressionChildren(Predicate predicate) {
-            ColumnVector left = visit(getLeft(predicate));
-            ColumnVector right = visit(getRight(predicate));
-            checkArgument(left.getSize() == right.getSize(),
-                    "Left and right operand returned different results: 
left=%d, right=d", left.getSize(),
-                    right.getSize());
-            return new PredicateChildrenEvalResult(left.getSize(), left, 
right);
-        }
-    }
-
-    /**
-     * Encapsulates children expression result of binary input predicate
-     */
-    private static class PredicateChildrenEvalResult {
-        public final int rowCount;
-        public final ColumnVector leftResult;
-        public final ColumnVector rightResult;
-
-        PredicateChildrenEvalResult(int rowCount, ColumnVector leftResult, 
ColumnVector rightResult) {
-            this.rowCount = rowCount;
-            this.leftResult = leftResult;
-            this.rightResult = rightResult;
-        }
-    }
-
-    private static void assertColumnExists(boolean condition, StructType 
schema, Column column) {
-        if (!condition) {
-            throw new IllegalArgumentException(
-                    String.format("%s doesn't exist in input data schema: %s", 
column, schema));
-        }
-    }
-
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java
deleted file mode 100644
index d9f5251420..0000000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.aws.delta;
-
-import io.delta.kernel.defaults.engine.DefaultExpressionHandler;
-import io.delta.kernel.expressions.Expression;
-import io.delta.kernel.expressions.ExpressionEvaluator;
-import io.delta.kernel.expressions.Predicate;
-import io.delta.kernel.expressions.PredicateEvaluator;
-import io.delta.kernel.types.DataType;
-import io.delta.kernel.types.StructType;
-
-public class DeltaExpressionHandler extends DefaultExpressionHandler {
-
-    @Override
-    public ExpressionEvaluator getEvaluator(StructType inputSchema, Expression 
expression, DataType outputType) {
-        return new DeltaExpressionEvaluator(inputSchema, expression, 
outputType);
-    }
-
-    @Override
-    public PredicateEvaluator getPredicateEvaluator(StructType inputSchema, 
Predicate predicate) {
-        return new DeltaPredicateEvaluator(inputSchema, predicate);
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index 44dfdf006c..121a76b2e8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -22,14 +22,8 @@ import static 
io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -45,40 +39,15 @@ import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 
 import io.delta.kernel.Scan;
-import io.delta.kernel.data.ColumnVector;
 import io.delta.kernel.data.ColumnarBatch;
 import io.delta.kernel.data.FilteredColumnarBatch;
 import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
-import io.delta.kernel.engine.ExpressionHandler;
-import io.delta.kernel.expressions.ExpressionEvaluator;
-import io.delta.kernel.expressions.Literal;
 import io.delta.kernel.expressions.Predicate;
 import io.delta.kernel.internal.InternalScanFileUtils;
-import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
 import io.delta.kernel.internal.data.ScanStateRow;
-import io.delta.kernel.internal.data.SelectionColumnVector;
-import io.delta.kernel.internal.deletionvectors.DeletionVectorUtils;
-import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray;
-import io.delta.kernel.internal.util.ColumnMapping;
-import io.delta.kernel.internal.util.InternalUtils;
-import io.delta.kernel.internal.util.Tuple2;
-import io.delta.kernel.types.BinaryType;
-import io.delta.kernel.types.BooleanType;
-import io.delta.kernel.types.ByteType;
-import io.delta.kernel.types.DataType;
-import io.delta.kernel.types.DateType;
-import io.delta.kernel.types.DecimalType;
-import io.delta.kernel.types.DoubleType;
-import io.delta.kernel.types.FloatType;
-import io.delta.kernel.types.IntegerType;
-import io.delta.kernel.types.LongType;
-import io.delta.kernel.types.ShortType;
-import io.delta.kernel.types.StringType;
-import io.delta.kernel.types.StructField;
 import io.delta.kernel.types.StructType;
-import io.delta.kernel.types.TimestampNTZType;
-import io.delta.kernel.types.TimestampType;
 import io.delta.kernel.utils.CloseableIterator;
 import io.delta.kernel.utils.FileStatus;
 
@@ -105,7 +74,7 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
     public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, ConfFactory config,
             String filterExpressionStr) throws HyracksDataException {
         JobConf conf = config.getConf();
-        this.engine = DeltaEngine.create(conf);
+        this.engine = DefaultEngine.create(conf);
         this.scanFiles = new ArrayList<>();
         for (String scanFile : serScanFiles) {
             this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
@@ -125,7 +94,7 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
             try {
                 this.physicalDataIter = engine.getParquetHandler()
                         
.readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, 
filterPredicate);
-                this.dataIter = transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
+                this.dataIter = Scan.transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
                 if (dataIter.hasNext()) {
                     rows = dataIter.next().getRows();
                 }
@@ -196,182 +165,4 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
     public boolean handleException(Throwable th) {
         return false;
     }
-
-    static CloseableIterator<FilteredColumnarBatch> 
transformPhysicalData(Engine engine, Row scanState, Row scanFile,
-            CloseableIterator<ColumnarBatch> physicalDataIter) throws 
IOException {
-        return new CloseableIterator<FilteredColumnarBatch>() {
-            boolean inited = false;
-
-            // initialized as part of init()
-            StructType physicalReadSchema = null;
-            StructType logicalReadSchema = null;
-            String tablePath = null;
-
-            RoaringBitmapArray currBitmap = null;
-            DeletionVectorDescriptor currDV = null;
-
-            private void initIfRequired() {
-                if (inited) {
-                    return;
-                }
-                physicalReadSchema = ScanStateRow.getPhysicalSchema(engine, 
scanState);
-                logicalReadSchema = ScanStateRow.getLogicalSchema(engine, 
scanState);
-
-                tablePath = ScanStateRow.getTableRoot(scanState);
-                inited = true;
-            }
-
-            @Override
-            public void close() throws IOException {
-                physicalDataIter.close();
-            }
-
-            @Override
-            public boolean hasNext() {
-                initIfRequired();
-                return physicalDataIter.hasNext();
-            }
-
-            @Override
-            public FilteredColumnarBatch next() {
-                initIfRequired();
-                ColumnarBatch nextDataBatch = physicalDataIter.next();
-
-                DeletionVectorDescriptor dv = 
InternalScanFileUtils.getDeletionVectorDescriptorFromRow(scanFile);
-
-                int rowIndexOrdinal = 
nextDataBatch.getSchema().indexOf(StructField.METADATA_ROW_INDEX_COLUMN_NAME);
-
-                // Get the selectionVector if DV is present
-                Optional<ColumnVector> selectionVector;
-                if (dv == null) {
-                    selectionVector = Optional.empty();
-                } else {
-                    if (rowIndexOrdinal == -1) {
-                        throw new IllegalArgumentException(
-                                "Row index column is not " + "present in the 
data read from the Parquet file.");
-                    }
-                    if (!dv.equals(currDV)) {
-                        Tuple2<DeletionVectorDescriptor, RoaringBitmapArray> 
dvInfo =
-                                DeletionVectorUtils.loadNewDvAndBitmap(engine, 
tablePath, dv);
-                        this.currDV = dvInfo._1;
-                        this.currBitmap = dvInfo._2;
-                    }
-                    ColumnVector rowIndexVector = 
nextDataBatch.getColumnVector(rowIndexOrdinal);
-                    selectionVector = Optional.of(new 
SelectionColumnVector(currBitmap, rowIndexVector));
-                }
-                if (rowIndexOrdinal != -1) {
-                    nextDataBatch = 
nextDataBatch.withDeletedColumnAt(rowIndexOrdinal);
-                }
-
-                // Add partition columns
-                nextDataBatch = 
withPartitionColumns(engine.getExpressionHandler(), nextDataBatch,
-                        InternalScanFileUtils.getPartitionValues(scanFile), 
physicalReadSchema);
-
-                // Change back to logical schema
-                String columnMappingMode = 
ScanStateRow.getColumnMappingMode(scanState);
-                switch (columnMappingMode) {
-                    case ColumnMapping.COLUMN_MAPPING_MODE_NAME:
-                    case ColumnMapping.COLUMN_MAPPING_MODE_ID:
-                        nextDataBatch = 
nextDataBatch.withNewSchema(logicalReadSchema);
-                        break;
-                    case ColumnMapping.COLUMN_MAPPING_MODE_NONE:
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Column mapping mode is not yet supported: " + 
columnMappingMode);
-                }
-
-                return new FilteredColumnarBatch(nextDataBatch, 
selectionVector);
-            }
-        };
-    }
-
-    public static ColumnarBatch withPartitionColumns(ExpressionHandler 
expressionHandler, ColumnarBatch dataBatch,
-            Map<String, String> partitionValues, StructType 
schemaWithPartitionCols) {
-        if (partitionValues == null || partitionValues.size() == 0) {
-            // no partition column vectors to attach to.
-            return dataBatch;
-        }
-
-        for (int colIdx = 0; colIdx < schemaWithPartitionCols.length(); 
colIdx++) {
-            StructField structField = schemaWithPartitionCols.at(colIdx);
-
-            if (partitionValues.containsKey(structField.getName())) {
-                // Create a partition vector
-
-                ExpressionEvaluator evaluator = 
expressionHandler.getEvaluator(dataBatch.getSchema(),
-                        literalForPartitionValue(structField.getDataType(), 
partitionValues.get(structField.getName())),
-                        structField.getDataType());
-
-                ColumnVector partitionVector = evaluator.eval(dataBatch);
-                dataBatch = dataBatch.withNewColumn(colIdx, structField, 
partitionVector);
-            }
-        }
-
-        return dataBatch;
-    }
-
-    protected static Literal literalForPartitionValue(DataType dataType, 
String partitionValue) {
-        if (partitionValue == null) {
-            return Literal.ofNull(dataType);
-        }
-
-        if (dataType instanceof BooleanType) {
-            return Literal.ofBoolean(Boolean.parseBoolean(partitionValue));
-        }
-        if (dataType instanceof ByteType) {
-            return Literal.ofByte(Byte.parseByte(partitionValue));
-        }
-        if (dataType instanceof ShortType) {
-            return Literal.ofShort(Short.parseShort(partitionValue));
-        }
-        if (dataType instanceof IntegerType) {
-            return Literal.ofInt(Integer.parseInt(partitionValue));
-        }
-        if (dataType instanceof LongType) {
-            return Literal.ofLong(Long.parseLong(partitionValue));
-        }
-        if (dataType instanceof FloatType) {
-            return Literal.ofFloat(Float.parseFloat(partitionValue));
-        }
-        if (dataType instanceof DoubleType) {
-            return Literal.ofDouble(Double.parseDouble(partitionValue));
-        }
-        if (dataType instanceof StringType) {
-            return Literal.ofString(partitionValue);
-        }
-        if (dataType instanceof BinaryType) {
-            return Literal.ofBinary(partitionValue.getBytes());
-        }
-        if (dataType instanceof DateType) {
-            return 
Literal.ofDate(InternalUtils.daysSinceEpoch(Date.valueOf(partitionValue)));
-        }
-        if (dataType instanceof DecimalType) {
-            DecimalType decimalType = (DecimalType) dataType;
-            return Literal.ofDecimal(new BigDecimal(partitionValue), 
decimalType.getPrecision(),
-                    decimalType.getScale());
-        }
-        if (dataType instanceof TimestampType) {
-            try {
-                Timestamp timestamp = Timestamp.valueOf(partitionValue);
-                return 
Literal.ofTimestamp(InternalUtils.microsSinceEpoch(timestamp));
-            } catch (IllegalArgumentException e) {
-                Instant instant = Instant.parse(partitionValue);
-                return 
Literal.ofTimestamp(ChronoUnit.MICROS.between(Instant.EPOCH, instant));
-            }
-        }
-        if (dataType instanceof TimestampNTZType) {
-            // Both the timestamp and timestamp_ntz have no timezone info, so 
they are interpreted
-            // in local time zone.
-            try {
-                Timestamp timestamp = Timestamp.valueOf(partitionValue);
-                return 
Literal.ofTimestampNtz(InternalUtils.microsSinceEpoch(timestamp));
-            } catch (IllegalArgumentException e) {
-                Instant instant = Instant.parse(partitionValue);
-                return 
Literal.ofTimestampNtz(ChronoUnit.MICROS.between(Instant.EPOCH, instant));
-            }
-        }
-
-        throw new UnsupportedOperationException("Unsupported partition column: 
" + dataType);
-    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java
deleted file mode 100644
index 494d3bc5bf..0000000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.aws.delta;
-
-import java.util.Optional;
-
-import io.delta.kernel.data.ColumnVector;
-import io.delta.kernel.data.ColumnarBatch;
-import io.delta.kernel.defaults.internal.data.vector.DefaultConstantVector;
-import io.delta.kernel.expressions.And;
-import io.delta.kernel.expressions.Column;
-import io.delta.kernel.expressions.ExpressionEvaluator;
-import io.delta.kernel.expressions.Literal;
-import io.delta.kernel.expressions.Predicate;
-import io.delta.kernel.expressions.PredicateEvaluator;
-import io.delta.kernel.internal.util.Utils;
-import io.delta.kernel.types.BooleanType;
-import io.delta.kernel.types.StructField;
-import io.delta.kernel.types.StructType;
-
-public class DeltaPredicateEvaluator implements PredicateEvaluator {
-
-    private final ExpressionEvaluator expressionEvaluator;
-    private static final String EXISTING_SEL_VECTOR_COL_NAME = 
"____existing_selection_vector_value____";
-    private static final StructField EXISTING_SEL_VECTOR_FIELD =
-            new StructField(EXISTING_SEL_VECTOR_COL_NAME, BooleanType.BOOLEAN, 
false);
-
-    public DeltaPredicateEvaluator(StructType inputSchema, Predicate 
predicate) {
-        Predicate rewrittenPredicate = new And(
-                new Predicate("=", new Column(EXISTING_SEL_VECTOR_COL_NAME), 
Literal.ofBoolean(true)), predicate);
-        StructType rewrittenInputSchema = 
inputSchema.add(EXISTING_SEL_VECTOR_FIELD);
-        this.expressionEvaluator =
-                new DeltaExpressionEvaluator(rewrittenInputSchema, 
rewrittenPredicate, BooleanType.BOOLEAN);
-    }
-
-    @Override
-    public ColumnVector eval(ColumnarBatch inputData, Optional<ColumnVector> 
existingSelectionVector) {
-        try {
-            ColumnVector newVector = existingSelectionVector
-                    .orElse(new DefaultConstantVector(BooleanType.BOOLEAN, 
inputData.getSize(), true));
-            ColumnarBatch withExistingSelVector =
-                    inputData.withNewColumn(inputData.getSchema().length(), 
EXISTING_SEL_VECTOR_FIELD, newVector);
-
-            return expressionEvaluator.eval(withExistingSelVector);
-        } finally {
-            // release the existing selection vector.
-            Utils.closeCloseables(existingSelectionVector.orElse(null));
-        }
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 8328ec323c..03285efbe9 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -60,6 +60,7 @@ import io.delta.kernel.Scan;
 import io.delta.kernel.Snapshot;
 import io.delta.kernel.data.FilteredColumnarBatch;
 import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
 import io.delta.kernel.exceptions.KernelEngineException;
 import io.delta.kernel.exceptions.KernelException;
@@ -104,7 +105,7 @@ public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object>
         configureJobConf(appCtx, conf, configuration);
         confFactory = new ConfFactory(conf);
         String tableMetadataPath = getTablePath(configuration);
-        Engine engine = DeltaEngine.create(conf);
+        Engine engine = DefaultEngine.create(conf);
         io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
         Snapshot snapshot;
         try {
@@ -122,7 +123,7 @@ public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object>
             ARecordType expectedType = HDFSUtils.getExpectedType(conf);
             Map<String, FunctionCallInformation> functionCallInformationMap =
                     HDFSUtils.getFunctionCallInformationMap(conf);
-            StructType fileSchema = snapshot.getSchema(engine);
+            StructType fileSchema = snapshot.getSchema();
             requiredSchema = visitor.clipType(expectedType, fileSchema, 
functionCallInformationMap);
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -132,26 +133,26 @@ public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object>
         Expression filterExpression = ((DeltaTableFilterEvaluatorFactory) 
filterEvaluatorFactory).getFilterExpression();
         Scan scan;
         if (filterExpression != null) {
-            scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema)
-                    .withFilter(engine, (Predicate) filterExpression).build();
+            scan = 
snapshot.getScanBuilder().withReadSchema(requiredSchema).withFilter((Predicate) 
filterExpression)
+                    .build();
             if (scan.getRemainingFilter().isPresent()) {
                 filterExpressionStr = 
PredicateSerDe.serializeExpressionToJson(scan.getRemainingFilter().get());
             } else {
                 filterExpressionStr = null;
             }
         } else {
-            scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
+            scan = 
snapshot.getScanBuilder().withReadSchema(requiredSchema).build();
             filterExpressionStr = null;
         }
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         List<Row> scanFiles;
         try {
             scanFiles = getScanFiles(scan, engine);
-        } catch (UnsupportedOperationException | IllegalStateException e) {
+        } catch (UnsupportedOperationException | IllegalStateException | 
KernelEngineException e) {
             // Delta kernel API failed to apply expression due to type 
mismatch.
             // We need to fall back to skip applying the filter and return all 
files.
             LOGGER.info("Exception encountered while getting delta table files 
to scan {}", e.getMessage());
-            scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
+            scan = 
snapshot.getScanBuilder().withReadSchema(requiredSchema).build();
             filterExpressionStr = null;
             scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
             scanFiles = getScanFiles(scan, engine);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java
deleted file mode 100644
index 388a287bd2..0000000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.aws.delta;
-
-import static 
io.delta.kernel.defaults.internal.DefaultEngineErrors.unsupportedExpressionException;
-import static io.delta.kernel.internal.util.Preconditions.checkArgument;
-import static java.lang.String.format;
-import static 
org.apache.asterix.external.input.record.reader.aws.delta.ImplicitCastExpression.canCastTo;
-
-import java.util.Arrays;
-
-import io.delta.kernel.data.ColumnVector;
-import io.delta.kernel.data.MapValue;
-import io.delta.kernel.expressions.Expression;
-import io.delta.kernel.expressions.ScalarExpression;
-import io.delta.kernel.internal.util.Utils;
-import io.delta.kernel.types.DataType;
-import io.delta.kernel.types.MapType;
-import io.delta.kernel.types.StringType;
-
-/**
- * Utility methods to evaluate {@code element_at} expression.
- */
-class ElementAtEvaluator {
-    private ElementAtEvaluator() {
-    }
-
-    /**
-     * Validate and transform the {@code element_at} expression with given 
validated and
-     * transformed inputs.
-     */
-    static ScalarExpression validateAndTransform(ScalarExpression elementAt, 
Expression mapInput, DataType mapInputType,
-            Expression lookupKey, DataType lookupKeyType) {
-
-        MapType asMapType = validateSupportedMapType(elementAt, mapInputType);
-        DataType keyTypeFromMapInput = asMapType.getKeyType();
-
-        if (!keyTypeFromMapInput.equivalent(lookupKeyType)) {
-            if (canCastTo(lookupKeyType, keyTypeFromMapInput)) {
-                lookupKey = new ImplicitCastExpression(lookupKey, 
keyTypeFromMapInput);
-            } else {
-                String reason = format("lookup key type (%s) is different from 
the map key type (%s)", lookupKeyType,
-                        asMapType.getKeyType());
-                throw unsupportedExpressionException(elementAt, reason);
-            }
-        }
-        return new ScalarExpression(elementAt.getName(), 
Arrays.asList(mapInput, lookupKey));
-    }
-
-    /**
-     * Utility method to evaluate the {@code element_at} on given map and key 
vectors.
-     * @param map {@link ColumnVector} of {@code map(string, string)} type.
-     * @param lookupKey {@link ColumnVector} of {@code string} type.
-     * @return result {@link ColumnVector} containing the lookup values.
-     */
-    static ColumnVector eval(ColumnVector map, ColumnVector lookupKey) {
-        return new ColumnVector() {
-            // Store the last lookup value to avoid multiple looks up for same 
row id.
-            // The general pattern is call `isNullAt(rowId)` followed by 
`getString`.
-            // So the cache of one value is enough.
-            private int lastLookupRowId = -1;
-            private String lastLookupValue = null;
-
-            @Override
-            public DataType getDataType() {
-                return ((MapType) map.getDataType()).getValueType();
-            }
-
-            @Override
-            public int getSize() {
-                return map.getSize();
-            }
-
-            @Override
-            public void close() {
-                Utils.closeCloseables(map, lookupKey);
-            }
-
-            @Override
-            public boolean isNullAt(int rowId) {
-                if (rowId == lastLookupRowId) {
-                    return lastLookupValue == null;
-                }
-                return map.isNullAt(rowId) || lookupValue(rowId) == null;
-            }
-
-            @Override
-            public String getString(int rowId) {
-                lookupValue(rowId);
-                return lastLookupValue == null ? null : lastLookupValue;
-            }
-
-            private Object lookupValue(int rowId) {
-                if (rowId == lastLookupRowId) {
-                    return lastLookupValue;
-                }
-                lastLookupRowId = rowId;
-                String keyValue = lookupKey.getString(rowId);
-                lastLookupValue = findValueForKey(map.getMap(rowId), keyValue);
-                return lastLookupValue;
-            }
-
-            /**
-             * Given a {@link MapValue} and string {@code key} find the 
corresponding value.
-             * Returns null if the key is not in the map.
-             * @param mapValue String->String map to search
-             * @param key the key to look up the value for; may be null
-             */
-            private String findValueForKey(MapValue mapValue, String key) {
-                ColumnVector keyVector = mapValue.getKeys();
-                for (int i = 0; i < mapValue.getSize(); i++) {
-                    if ((keyVector.isNullAt(i) && key == null)
-                            || (!keyVector.isNullAt(i) && 
keyVector.getString(i).equals(key))) {
-                        return mapValue.getValues().isNullAt(i) ? null : 
mapValue.getValues().getString(i);
-                    }
-                }
-                // If the key is not in the map return null
-                return null;
-            }
-        };
-    }
-
-    private static MapType validateSupportedMapType(Expression elementAt, 
DataType mapInputType) {
-        checkArgument(mapInputType instanceof MapType, "expected a map type 
input as first argument: " + elementAt);
-        MapType asMapType = (MapType) mapInputType;
-        // For now we only need to support lookup in columns of type 
`map(string -> string)`.
-        // Additional type support may be added later
-        if (asMapType.getKeyType().equivalent(StringType.STRING)
-                && asMapType.getValueType().equivalent(StringType.STRING)) {
-            return asMapType;
-        }
-        throw new UnsupportedOperationException(
-                format("%s: Supported only on type map(string, string) input 
data", elementAt));
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java
deleted file mode 100644
index 7e2a65be6d..0000000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.aws.delta;
-
-import static io.delta.kernel.expressions.AlwaysFalse.ALWAYS_FALSE;
-import static io.delta.kernel.expressions.AlwaysTrue.ALWAYS_TRUE;
-import static java.util.stream.Collectors.joining;
-
-import java.util.List;
-import java.util.Locale;
-
-import io.delta.kernel.expressions.AlwaysFalse;
-import io.delta.kernel.expressions.AlwaysTrue;
-import io.delta.kernel.expressions.And;
-import io.delta.kernel.expressions.Column;
-import io.delta.kernel.expressions.Expression;
-import io.delta.kernel.expressions.Literal;
-import io.delta.kernel.expressions.Or;
-import io.delta.kernel.expressions.PartitionValueExpression;
-import io.delta.kernel.expressions.Predicate;
-import io.delta.kernel.expressions.ScalarExpression;
-
-/**
- * Interface to allow visiting an expression tree and implementing handling 
for each
- * specific expression type.
- *
- * @param <R> Return type of result of visit expression methods.
- */
-abstract class ExpressionVisitor<R> {
-
-    abstract R visitAnd(And and);
-
-    abstract R visitOr(Or or);
-
-    abstract R visitAlwaysTrue(AlwaysTrue alwaysTrue);
-
-    abstract R visitAlwaysFalse(AlwaysFalse alwaysFalse);
-
-    abstract R visitComparator(Predicate predicate);
-
-    abstract R visitLiteral(Literal literal);
-
-    abstract R visitColumn(Column column);
-
-    abstract R visitCast(ImplicitCastExpression cast);
-
-    abstract R visitPartitionValue(PartitionValueExpression partitionValue);
-
-    abstract R visitElementAt(ScalarExpression elementAt);
-
-    abstract R visitNot(Predicate predicate);
-
-    abstract R visitIsNotNull(Predicate predicate);
-
-    abstract R visitIsNull(Predicate predicate);
-
-    abstract R visitCoalesce(ScalarExpression ifNull);
-
-    final R visit(Expression expression) {
-        if (expression instanceof PartitionValueExpression) {
-            return visitPartitionValue((PartitionValueExpression) expression);
-        } else if (expression instanceof ScalarExpression) {
-            return visitScalarExpression((ScalarExpression) expression);
-        } else if (expression instanceof Literal) {
-            return visitLiteral((Literal) expression);
-        } else if (expression instanceof Column) {
-            return visitColumn((Column) expression);
-        } else if (expression instanceof ImplicitCastExpression) {
-            return visitCast((ImplicitCastExpression) expression);
-        }
-
-        throw new UnsupportedOperationException(String.format("Expression %s 
is not supported.", expression));
-    }
-
-    private R visitScalarExpression(ScalarExpression expression) {
-        List<Expression> children = expression.getChildren();
-        String name = expression.getName().toUpperCase(Locale.ENGLISH);
-        switch (name) {
-            case "ALWAYS_TRUE":
-                return visitAlwaysTrue(ALWAYS_TRUE);
-            case "ALWAYS_FALSE":
-                return visitAlwaysFalse(ALWAYS_FALSE);
-            case "AND":
-                return visitAnd(new And(elemAsPredicate(children, 0), 
elemAsPredicate(children, 1)));
-            case "OR":
-                return visitOr(new Or(elemAsPredicate(children, 0), 
elemAsPredicate(children, 1)));
-            case "=":
-            case "<":
-            case "<=":
-            case ">":
-            case ">=":
-                return visitComparator(new Predicate(name, children));
-            case "ELEMENT_AT":
-                return visitElementAt(expression);
-            case "NOT":
-                return visitNot(new Predicate(name, children));
-            case "IS_NOT_NULL":
-                return visitIsNotNull(new Predicate(name, children));
-            case "IS_NULL":
-                return visitIsNull(new Predicate(name, children));
-            case "COALESCE":
-                return visitCoalesce(expression);
-            default:
-                throw new UnsupportedOperationException(
-                        String.format("Scalar expression `%s` is not 
supported.", name));
-        }
-    }
-
-    private static Predicate elemAsPredicate(List<Expression> expressions, int 
index) {
-        if (expressions.size() <= index) {
-            throw new RuntimeException(String.format("Trying to access invalid 
entry (%d) in list %s", index,
-                    
expressions.stream().map(Object::toString).collect(joining(","))));
-        }
-        Expression elemExpression = expressions.get(index);
-        if (!(elemExpression instanceof Predicate)) {
-            throw new RuntimeException("Expected a predicate, but got " + 
elemExpression);
-        }
-        return (Predicate) expressions.get(index);
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java
deleted file mode 100644
index febd4d3160..0000000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.aws.delta;
-
-import static java.lang.String.format;
-import static java.util.Collections.unmodifiableMap;
-import static java.util.Objects.requireNonNull;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import io.delta.kernel.data.ColumnVector;
-import io.delta.kernel.expressions.Expression;
-import io.delta.kernel.types.DataType;
-
-/**
- * An implicit cast expression to convert the input type to another given 
type. Here is the valid
- * list of casts
- * <p>
- *  <ul>
- *    <li>{@code byte} to {@code short, int, long, float, double}</li>
- *    <li>{@code short} to {@code int, long, float, double}</li>
- *    <li>{@code int} to {@code long, float, double}</li>
- *    <li>{@code long} to {@code float, double}</li>
- *    <li>{@code float} to {@code double}</li>
- *  </ul>
- *
- * <p>
- * The above list is not exhaustive. Based on the need, we can add more casts.
- * <p>
- */
-final class ImplicitCastExpression implements Expression {
-    private final Expression input;
-    private final DataType outputType;
-
-    /**
-     * Create a cast around the given input expression to specified output data
-     * type. It is the responsibility of the caller to validate the input 
expression can be cast
-     * to the new type using {@link #canCastTo(DataType, DataType)}
-     */
-    ImplicitCastExpression(Expression input, DataType outputType) {
-        this.input = requireNonNull(input, "input is null");
-        this.outputType = requireNonNull(outputType, "outputType is null");
-    }
-
-    public Expression getInput() {
-        return input;
-    }
-
-    public DataType getOutputType() {
-        return outputType;
-    }
-
-    @Override
-    public List<Expression> getChildren() {
-        return Collections.singletonList(input);
-    }
-
-    /**
-     * Evaluate the given column expression on the input {@link ColumnVector}.
-     *
-     * @param input {@link ColumnVector} data of the input to the cast 
expression.
-     * @return {@link ColumnVector} result applying target type casting on 
every element in the
-     * input {@link ColumnVector}.
-     */
-    ColumnVector eval(ColumnVector input) {
-        String fromTypeStr = input.getDataType().toString();
-        switch (fromTypeStr) {
-            case "byte":
-                return new ByteUpConverter(outputType, input);
-            case "short":
-                return new ShortUpConverter(outputType, input);
-            case "integer":
-                return new IntUpConverter(outputType, input);
-            case "long":
-                return new LongUpConverter(outputType, input);
-            case "float":
-                return new FloatUpConverter(outputType, input);
-            default:
-                throw new UnsupportedOperationException(format("Cast from %s 
is not supported", fromTypeStr));
-        }
-    }
-
-    /**
-     * Map containing for each type what are the target cast types can be.
-     */
-    private static final Map<String, List<String>> UP_CASTABLE_TYPE_TABLE;
-
-    static {
-        Map<String, List<String>> map = new HashMap<>();
-        map.put("byte", Arrays.asList("short", "integer", "long", "float", 
"double"));
-        map.put("short", Arrays.asList("integer", "long", "float", "double"));
-        map.put("integer", Arrays.asList("long", "float", "double"));
-        map.put("long", Arrays.asList("float", "double"));
-        map.put("float", Arrays.asList("double"));
-        UP_CASTABLE_TYPE_TABLE = unmodifiableMap(map);
-    }
-
-    /**
-     * Utility method which returns whether the given {@code from} type can be 
cast to {@code to}
-     * type.
-     */
-    static boolean canCastTo(DataType from, DataType to) {
-        // TODO: The type name should be a first class method on `DataType` 
instead of getting it
-        // using the `toString`.
-        String fromStr = from.toString();
-        String toStr = to.toString();
-        return UP_CASTABLE_TYPE_TABLE.containsKey(fromStr) && 
UP_CASTABLE_TYPE_TABLE.get(fromStr).contains(toStr);
-    }
-
-    /**
-     * Base class for up casting {@link ColumnVector} data.
-     */
-    private abstract static class UpConverter implements ColumnVector {
-        protected final DataType targetType;
-        protected final ColumnVector inputVector;
-
-        UpConverter(DataType targetType, ColumnVector inputVector) {
-            this.targetType = targetType;
-            this.inputVector = inputVector;
-        }
-
-        @Override
-        public DataType getDataType() {
-            return targetType;
-        }
-
-        @Override
-        public boolean isNullAt(int rowId) {
-            return inputVector.isNullAt(rowId);
-        }
-
-        @Override
-        public int getSize() {
-            return inputVector.getSize();
-        }
-
-        @Override
-        public void close() {
-            inputVector.close();
-        }
-    }
-
-    private static class ByteUpConverter extends UpConverter {
-        ByteUpConverter(DataType targetType, ColumnVector inputVector) {
-            super(targetType, inputVector);
-        }
-
-        @Override
-        public short getShort(int rowId) {
-            return inputVector.getByte(rowId);
-        }
-
-        @Override
-        public int getInt(int rowId) {
-            return inputVector.getByte(rowId);
-        }
-
-        @Override
-        public long getLong(int rowId) {
-            return inputVector.getByte(rowId);
-        }
-
-        @Override
-        public float getFloat(int rowId) {
-            return inputVector.getByte(rowId);
-        }
-
-        @Override
-        public double getDouble(int rowId) {
-            return inputVector.getByte(rowId);
-        }
-    }
-
-    private static class ShortUpConverter extends UpConverter {
-        ShortUpConverter(DataType targetType, ColumnVector inputVector) {
-            super(targetType, inputVector);
-        }
-
-        @Override
-        public int getInt(int rowId) {
-            return inputVector.getShort(rowId);
-        }
-
-        @Override
-        public long getLong(int rowId) {
-            return inputVector.getShort(rowId);
-        }
-
-        @Override
-        public float getFloat(int rowId) {
-            return inputVector.getShort(rowId);
-        }
-
-        @Override
-        public double getDouble(int rowId) {
-            return inputVector.getShort(rowId);
-        }
-    }
-
-    private static class IntUpConverter extends UpConverter {
-        IntUpConverter(DataType targetType, ColumnVector inputVector) {
-            super(targetType, inputVector);
-        }
-
-        @Override
-        public long getLong(int rowId) {
-            return inputVector.getInt(rowId);
-        }
-
-        @Override
-        public float getFloat(int rowId) {
-            return inputVector.getInt(rowId);
-        }
-
-        @Override
-        public double getDouble(int rowId) {
-            return inputVector.getInt(rowId);
-        }
-    }
-
-    private static class LongUpConverter extends UpConverter {
-        LongUpConverter(DataType targetType, ColumnVector inputVector) {
-            super(targetType, inputVector);
-        }
-
-        @Override
-        public float getFloat(int rowId) {
-            return inputVector.getLong(rowId);
-        }
-
-        @Override
-        public double getDouble(int rowId) {
-            return inputVector.getLong(rowId);
-        }
-    }
-
-    private static class FloatUpConverter extends UpConverter {
-        FloatUpConverter(DataType targetType, ColumnVector inputVector) {
-            super(targetType, inputVector);
-        }
-
-        @Override
-        public double getDouble(int rowId) {
-            return inputVector.getFloat(rowId);
-        }
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java
deleted file mode 100644
index c24b225e7a..0000000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.aws.delta;
-
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-
-import io.delta.kernel.data.ColumnVector;
-import io.delta.kernel.internal.util.InternalUtils;
-import io.delta.kernel.types.DataType;
-import io.delta.kernel.types.DateType;
-import io.delta.kernel.types.IntegerType;
-import io.delta.kernel.types.LongType;
-import io.delta.kernel.types.TimestampNTZType;
-import io.delta.kernel.types.TimestampType;
-
-/**
- * Utility methods to evaluate {@code partition_value} expression
- */
-class PartitionValueEvaluator {
-    /**
-     * Evaluate the {@code partition_value} expression for given input column 
vector and generate
-     * a column vector with decoded values according to the given partition 
type.
-     */
-    static ColumnVector eval(ColumnVector input, DataType partitionType) {
-        return new ColumnVector() {
-            @Override
-            public DataType getDataType() {
-                return partitionType;
-            }
-
-            @Override
-            public int getSize() {
-                return input.getSize();
-            }
-
-            @Override
-            public void close() {
-                input.close();
-            }
-
-            @Override
-            public boolean isNullAt(int rowId) {
-                return input.isNullAt(rowId);
-            }
-
-            @Override
-            public boolean getBoolean(int rowId) {
-                return Boolean.parseBoolean(input.getString(rowId));
-            }
-
-            @Override
-            public byte getByte(int rowId) {
-                return Byte.parseByte(input.getString(rowId));
-            }
-
-            @Override
-            public short getShort(int rowId) {
-                return Short.parseShort(input.getString(rowId));
-            }
-
-            @Override
-            public int getInt(int rowId) {
-                if (partitionType.equivalent(IntegerType.INTEGER)) {
-                    return Integer.parseInt(input.getString(rowId));
-                } else if (partitionType.equivalent(DateType.DATE)) {
-                    return 
InternalUtils.daysSinceEpoch(Date.valueOf(input.getString(rowId)));
-                }
-                throw new UnsupportedOperationException("Invalid value request 
for data type");
-            }
-
-            @Override
-            public long getLong(int rowId) {
-                if (partitionType.equivalent(LongType.LONG)) {
-                    return Long.parseLong(input.getString(rowId));
-                } else if (partitionType.equivalent(TimestampType.TIMESTAMP)
-                        || 
partitionType.equivalent(TimestampNTZType.TIMESTAMP_NTZ)) {
-                    // Both the timestamp and timestamp_ntz have no timezone 
info,
-                    // so they are interpreted in local time zone.
-                    try {
-                        Timestamp timestamp = 
Timestamp.valueOf(input.getString(rowId));
-                        return InternalUtils.microsSinceEpoch(timestamp);
-                    } catch (IllegalArgumentException e) {
-                        Instant instant = 
Instant.parse(input.getString(rowId));
-                        return ChronoUnit.MICROS.between(Instant.EPOCH, 
instant);
-                    }
-                }
-                throw new UnsupportedOperationException("Invalid value request 
for data type");
-            }
-
-            @Override
-            public float getFloat(int rowId) {
-                return Float.parseFloat(input.getString(rowId));
-            }
-
-            @Override
-            public double getDouble(int rowId) {
-                return Double.parseDouble(input.getString(rowId));
-            }
-
-            @Override
-            public byte[] getBinary(int rowId) {
-                return input.isNullAt(rowId) ? null : 
input.getString(rowId).getBytes();
-            }
-
-            @Override
-            public String getString(int rowId) {
-                return input.getString(rowId);
-            }
-
-            @Override
-            public BigDecimal getDecimal(int rowId) {
-                return input.isNullAt(rowId) ? null : new 
BigDecimal(input.getString(rowId));
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml
index fc3950bd93..6d8a19bc27 100644
--- a/hyracks-fullstack/pom.xml
+++ b/hyracks-fullstack/pom.xml
@@ -474,6 +474,11 @@
         <artifactId>jackson-datatype-jsr310</artifactId>
         <version>${jackson.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.datatype</groupId>
+        <artifactId>jackson-datatype-jdk8</artifactId>
+        <version>${jackson.version}</version>
+      </dependency>
       <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
@@ -677,6 +682,11 @@
         <artifactId>hadoop-client-runtime</artifactId>
         <version>3.4.1</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-client-api</artifactId>
+        <version>3.4.1</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <build>


Reply via email to