This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit c42cd2602c8dbb3090b5056db8db29f00e848cfd
Author: Peeyush Gupta <[email protected]>
AuthorDate: Mon Mar 17 14:58:33 2025 -0700

    [ASTERIXDB-3576][EXT] push predicates down to delta tables to filter row 
groups
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Delta table's data files are essentially Parquet files. Parquet allows
    applying a predicate while reading data files to skip row groups.
    With this patch we pushdown filters to individual parquet files of the
    Delta table to filter row groups. The Predicate class of the Delta Kernel 
API
    is not serializable, so we have added a custom 
serialization/de-serialization
    of Delta kernel APIs Predicates.
    
    Ext-ref: MB-65315
    Change-Id: I9fa1a84d7be63ada7b9768a81984b2172e7401b3
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19527
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Peeyush Gupta <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../reader/aws/delta/DeltaFileRecordReader.java    |  16 ++-
 .../reader/aws/delta/DeltaReaderFactory.java       |  10 +-
 .../record/reader/aws/delta/PredicateSerDe.java    | 134 +++++++++++++++++++++
 3 files changed, 154 insertions(+), 6 deletions(-)

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index a094c221e5..121a76b2e8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -19,12 +19,15 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
 import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
@@ -41,6 +44,7 @@ import io.delta.kernel.data.FilteredColumnarBatch;
 import io.delta.kernel.data.Row;
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.expressions.Predicate;
 import io.delta.kernel.internal.InternalScanFileUtils;
 import io.delta.kernel.internal.data.ScanStateRow;
 import io.delta.kernel.types.StructType;
@@ -65,9 +69,10 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
     private int fileIndex;
     private Row scanFile;
     private CloseableIterator<Row> rows;
+    private Optional<Predicate> filterPredicate;
 
-    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, ConfFactory config)
-            throws HyracksDataException {
+    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, ConfFactory config,
+            String filterExpressionStr) throws HyracksDataException {
         JobConf conf = config.getConf();
         this.engine = DefaultEngine.create(conf);
         this.scanFiles = new ArrayList<>();
@@ -85,15 +90,16 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
             this.scanFile = scanFiles.get(0);
             this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
             this.physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+            this.filterPredicate = 
PredicateSerDe.deserializeExpressionFromJson(filterExpressionStr);
             try {
                 this.physicalDataIter = engine.getParquetHandler()
-                        
.readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, 
Optional.empty());
+                        
.readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, 
filterPredicate);
                 this.dataIter = Scan.transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
                 if (dataIter.hasNext()) {
                     rows = dataIter.next().getRows();
                 }
             } catch (IOException e) {
-                throw new RuntimeException(e);
+                throw new 
RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, 
getMessageOrToString(e));
             }
         }
     }
@@ -122,7 +128,7 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
             physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
             try {
                 physicalDataIter = 
engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
-                        physicalReadSchema, Optional.empty());
+                        physicalReadSchema, filterPredicate);
                 dataIter = Scan.transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 4e902b9943..b76dd4dbe4 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -79,6 +79,7 @@ public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object>
     private String scanState;
     protected final List<PartitionWorkLoadBasedOnSize> 
partitionWorkLoadsBasedOnSize = new ArrayList<>();
     protected ConfFactory confFactory;
+    private String filterExpressionStr;
 
     public List<PartitionWorkLoadBasedOnSize> 
getPartitionWorkLoadsBasedOnSize() {
         return partitionWorkLoadsBasedOnSize;
@@ -133,8 +134,14 @@ public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object>
         if (filterExpression != null) {
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema)
                     .withFilter(engine, (Predicate) filterExpression).build();
+            if (scan.getRemainingFilter().isPresent()) {
+                filterExpressionStr = 
PredicateSerDe.serializeExpressionToJson(scan.getRemainingFilter().get());
+            } else {
+                filterExpressionStr = null;
+            }
         } else {
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
+            filterExpressionStr = null;
         }
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         List<Row> scanFiles;
@@ -145,6 +152,7 @@ public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object>
             // We need to fall back to skip applying the filter and return all 
files.
             LOGGER.info("Exception encountered while getting delta table files 
to scan {}", e.getMessage());
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
+            filterExpressionStr = null;
             scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
             scanFiles = getScanFiles(scan, engine);
         }
@@ -206,7 +214,7 @@ public abstract class DeltaReaderFactory implements 
IRecordReaderFactory<Object>
         try {
             int partition = context.getPartition();
             return new 
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(),
 scanState,
-                    confFactory);
+                    confFactory, filterExpressionStr);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
new file mode 100644
index 0000000000..efd47f58ff
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+
+/**
+ * Utility class to serialize and deserialize {@link Predicate} object.
+ */
+public class PredicateSerDe {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private PredicateSerDe() {
+    }
+
+    public static String serializeExpressionToJson(Expression expression) {
+        Map<String, Object> expressionObject = visitExpression(expression);
+        try {
+            return OBJECT_MAPPER.writeValueAsString(expressionObject);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Optional<Predicate> deserializeExpressionFromJson(String 
jsonExpression) {
+        try {
+            if (jsonExpression == null) {
+                return Optional.empty();
+            }
+            JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonExpression);
+            return Optional.of((Predicate) visitExpression((ObjectNode) 
jsonNode));
+        } catch (JsonProcessingException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    public static Map<String, Object> visitPredicate(Predicate predicate) {
+        Map<String, Object> predicateObject = new HashMap<>();
+        predicateObject.put("type", "predicate");
+        predicateObject.put("name", predicate.getName());
+        predicateObject.put("left", 
visitExpression(predicate.getChildren().get(0)));
+        predicateObject.put("right", 
visitExpression(predicate.getChildren().get(1)));
+        return predicateObject;
+    }
+
+    public static Map<String, Object> visitLiteral(Literal literal) {
+        Map<String, Object> literalObject = new HashMap<>();
+        literalObject.put("type", "literal");
+        literalObject.put("dataType", literal.getDataType().toString());
+        literalObject.put("value", literal.getValue());
+        return literalObject;
+    }
+
+    public static Map<String, Object> visitColumn(Column column) {
+        Map<String, Object> columnObject = new HashMap<>();
+        columnObject.put("type", "column");
+        columnObject.put("names", column.getNames());
+        return columnObject;
+    }
+
+    private static Map<String, Object> visitExpression(Expression expression) {
+        return switch (expression) {
+            case Predicate predicate -> visitPredicate(predicate);
+            case Column column -> visitColumn(column);
+            case Literal literal -> visitLiteral(literal);
+            case null, default -> throw new 
UnsupportedOperationException("Unsupported expression type: " + expression);
+        };
+    }
+
+    public static Predicate visitPredicate(ObjectNode node) {
+        return new Predicate(node.get("name").asText(), 
visitExpression((ObjectNode) node.get("left")),
+                visitExpression((ObjectNode) node.get("right")));
+    }
+
+    public static Literal visitLiteral(ObjectNode node) {
+        switch (node.get("dataType").asText()) {
+            case "boolean" : return 
Literal.ofBoolean(node.get("value").asBoolean());
+            case "byte" : return Literal.ofByte((byte) 
node.get("value").asInt());
+            case "short" : return 
Literal.ofShort(node.get("value").shortValue());
+            case "integer" : return Literal.ofInt(node.get("value").asInt());
+            case "long" : return Literal.ofLong(node.get("value").asLong());
+            case "float" : return 
Literal.ofFloat(node.get("value").floatValue());
+            case "double" : return 
Literal.ofDouble(node.get("value").doubleValue());
+            case "date" : return Literal.ofDate(node.get("value").asInt());
+            case "timestamp" : return 
Literal.ofTimestamp(node.get("value").asLong());
+            case "string" : return 
Literal.ofString(node.get("value").asText());
+            case null, default : throw new 
UnsupportedOperationException("Unsupported literal type: " + 
node.get("dataType").asText());
+        }
+    }
+
+    public static Column visitColumn(ObjectNode node) {
+        if (node.get("names").isArray()) {
+            return new 
Column(StreamSupport.stream(node.get("names").spliterator(), 
false).map(JsonNode::asText)
+                    .toArray(String[]::new));
+        } else {
+            return new Column(node.get("names").asText());
+        }
+    }
+
+    private static Expression visitExpression(ObjectNode node) {
+        return switch (node.get("type").asText()) {
+            case "predicate" -> visitPredicate(node);
+            case "column" -> visitColumn(node);
+            case "literal" -> visitLiteral(node);
+            case null, default -> throw new 
UnsupportedOperationException("Unsupported expression type: " + 
node.get("type").asText());
+        };
+    }
+}

Reply via email to