This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit c42cd2602c8dbb3090b5056db8db29f00e848cfd Author: Peeyush Gupta <[email protected]> AuthorDate: Mon Mar 17 14:58:33 2025 -0700 [ASTERIXDB-3576][EXT] push predicates down to delta tables to filter row groups - user model changes: no - storage format changes: no - interface changes: no Details: Delta table's data files are essentially Parquet files. Parquet allows applying a predicate while reading data files to skip row groups. With this patch we pushdown filters to individual parquet files of the Delta table to filter row groups. The Predicate class of the Delta Kernel API is not serializable, so we have added a custom serialization/de-serialization of Delta kernel APIs Predicates. Ext-ref: MB-65315 Change-Id: I9fa1a84d7be63ada7b9768a81984b2172e7401b3 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19527 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Peeyush Gupta <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Jenkins <[email protected]> --- .../reader/aws/delta/DeltaFileRecordReader.java | 16 ++- .../reader/aws/delta/DeltaReaderFactory.java | 10 +- .../record/reader/aws/delta/PredicateSerDe.java | 134 +++++++++++++++++++++ 3 files changed, 154 insertions(+), 6 deletions(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java index a094c221e5..121a76b2e8 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java @@ -19,12 +19,15 @@ package org.apache.asterix.external.input.record.reader.aws.delta; import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; @@ -41,6 +44,7 @@ import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.expressions.Predicate; import io.delta.kernel.internal.InternalScanFileUtils; import io.delta.kernel.internal.data.ScanStateRow; import io.delta.kernel.types.StructType; @@ -65,9 +69,10 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { private int fileIndex; private Row scanFile; private CloseableIterator<Row> rows; + private Optional<Predicate> filterPredicate; - public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config) - throws HyracksDataException { + public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config, + String filterExpressionStr) throws HyracksDataException { JobConf conf = config.getConf(); this.engine = DefaultEngine.create(conf); this.scanFiles = new ArrayList<>(); @@ -85,15 +90,16 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { this.scanFile = scanFiles.get(0); this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile); this.physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState); + this.filterPredicate = PredicateSerDe.deserializeExpressionFromJson(filterExpressionStr); try { this.physicalDataIter = engine.getParquetHandler() - .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, Optional.empty()); + .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, filterPredicate); this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter); if (dataIter.hasNext()) { rows = dataIter.next().getRows(); } } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); } } } @@ -122,7 +128,7 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState); try { physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus), - physicalReadSchema, Optional.empty()); + physicalReadSchema, filterPredicate); dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter); } catch (IOException e) { throw HyracksDataException.create(e); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java index 4e902b9943..b76dd4dbe4 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java @@ -79,6 +79,7 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> private String scanState; protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>(); protected ConfFactory confFactory; + private String filterExpressionStr; public List<PartitionWorkLoadBasedOnSize> getPartitionWorkLoadsBasedOnSize() { return partitionWorkLoadsBasedOnSize; @@ -133,8 +134,14 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> if (filterExpression != null) { scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema) .withFilter(engine, (Predicate) filterExpression).build(); + if (scan.getRemainingFilter().isPresent()) { + filterExpressionStr = PredicateSerDe.serializeExpressionToJson(scan.getRemainingFilter().get()); + } else { + filterExpressionStr = null; + } } else { scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); + filterExpressionStr = null; } scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); List<Row> scanFiles; @@ -145,6 +152,7 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> // We need to fall back to skip applying the filter and return all files. LOGGER.info("Exception encountered while getting delta table files to scan {}", e.getMessage()); scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); + filterExpressionStr = null; scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); scanFiles = getScanFiles(scan, engine); } @@ -206,7 +214,7 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> try { int partition = context.getPartition(); return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState, - confFactory); + confFactory, filterExpressionStr); } catch (Exception e) { throw HyracksDataException.create(e); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java new file mode 100644 index 0000000000..efd47f58ff --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java @@ -0,0 +1,134 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.StreamSupport; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import io.delta.kernel.expressions.Column; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.expressions.Predicate; + +/** + * Utility class to serialize and deserialize {@link Predicate} object. + */ +public class PredicateSerDe { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private PredicateSerDe() { + } + + public static String serializeExpressionToJson(Expression expression) { + Map<String, Object> expressionObject = visitExpression(expression); + try { + return OBJECT_MAPPER.writeValueAsString(expressionObject); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + public static Optional<Predicate> deserializeExpressionFromJson(String jsonExpression) { + try { + if (jsonExpression == null) { + return Optional.empty(); + } + JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonExpression); + return Optional.of((Predicate) visitExpression((ObjectNode) jsonNode)); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + } + + public static Map<String, Object> visitPredicate(Predicate predicate) { + Map<String, Object> predicateObject = new HashMap<>(); + predicateObject.put("type", "predicate"); + predicateObject.put("name", predicate.getName()); + predicateObject.put("left", visitExpression(predicate.getChildren().get(0))); + predicateObject.put("right", visitExpression(predicate.getChildren().get(1))); + return predicateObject; + } + + public static Map<String, Object> visitLiteral(Literal literal) { + Map<String, Object> literalObject = new HashMap<>(); + literalObject.put("type", "literal"); + literalObject.put("dataType", literal.getDataType().toString()); + literalObject.put("value", literal.getValue()); + return literalObject; + } + + public static Map<String, Object> visitColumn(Column column) { + Map<String, Object> columnObject = new HashMap<>(); + columnObject.put("type", "column"); + columnObject.put("names", column.getNames()); + return columnObject; + } + + private static Map<String, Object> visitExpression(Expression expression) { + return switch (expression) { + case Predicate predicate -> visitPredicate(predicate); + case Column column -> visitColumn(column); + case Literal literal -> visitLiteral(literal); + case null, default -> throw new UnsupportedOperationException("Unsupported expression type: " + expression); + }; + } + + public static Predicate visitPredicate(ObjectNode node) { + return new Predicate(node.get("name").asText(), visitExpression((ObjectNode) node.get("left")), + visitExpression((ObjectNode) node.get("right"))); + } + + public static Literal visitLiteral(ObjectNode node) { + switch (node.get("dataType").asText()) { + case "boolean" : return Literal.ofBoolean(node.get("value").asBoolean()); + case "byte" : return Literal.ofByte((byte) node.get("value").asInt()); + case "short" : return Literal.ofShort(node.get("value").shortValue()); + case "integer" : return Literal.ofInt(node.get("value").asInt()); + case "long" : return Literal.ofLong(node.get("value").asLong()); + case "float" : return Literal.ofFloat(node.get("value").floatValue()); + case "double" : return Literal.ofDouble(node.get("value").doubleValue()); + case "date" : return Literal.ofDate(node.get("value").asInt()); + case "timestamp" : return Literal.ofTimestamp(node.get("value").asLong()); + case "string" : return Literal.ofString(node.get("value").asText()); + case null, default : throw new UnsupportedOperationException("Unsupported literal type: " + node.get("dataType").asText()); + } + } + + public static Column visitColumn(ObjectNode node) { + if (node.get("names").isArray()) { + return new Column(StreamSupport.stream(node.get("names").spliterator(), false).map(JsonNode::asText) + .toArray(String[]::new)); + } else { + return new Column(node.get("names").asText()); + } + } + + private static Expression visitExpression(ObjectNode node) { + return switch (node.get("type").asText()) { + case "predicate" -> visitPredicate(node); + case "column" -> visitColumn(node); + case "literal" -> visitLiteral(node); + case null, default -> throw new UnsupportedOperationException("Unsupported expression type: " + node.get("type").asText()); + }; + } +}
