This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d4de459784940bd7f0443e051a3ff79c5d26c14c Author: Nicholas Jiang <[email protected]> AuthorDate: Fri Sep 1 09:36:45 2023 +0800 [HUDI-6066] HoodieTableSource supports parquet predicate push down (#8437) --- .../apache/hudi/source/ExpressionPredicates.java | 654 +++++++++++++++++++++ .../org/apache/hudi/table/HoodieTableSource.java | 18 +- .../apache/hudi/table/format/RecordIterators.java | 60 +- .../hudi/table/format/cdc/CdcInputFormat.java | 11 +- .../table/format/cow/CopyOnWriteInputFormat.java | 9 +- .../table/format/mor/MergeOnReadInputFormat.java | 17 +- .../hudi/source/TestExpressionPredicates.java | 167 ++++++ .../apache/hudi/table/ITTestHoodieDataSource.java | 14 + .../apache/hudi/table/TestHoodieTableSource.java | 23 + .../table/format/cow/ParquetSplitReaderUtil.java | 10 +- .../reader/ParquetColumnarRowSplitReader.java | 10 +- .../table/format/cow/ParquetSplitReaderUtil.java | 10 +- .../reader/ParquetColumnarRowSplitReader.java | 10 +- .../table/format/cow/ParquetSplitReaderUtil.java | 10 +- .../reader/ParquetColumnarRowSplitReader.java | 10 +- .../table/format/cow/ParquetSplitReaderUtil.java | 10 +- .../reader/ParquetColumnarRowSplitReader.java | 10 +- .../table/format/cow/ParquetSplitReaderUtil.java | 10 +- .../reader/ParquetColumnarRowSplitReader.java | 10 +- 19 files changed, 1037 insertions(+), 36 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java new file mode 100644 index 00000000000..046e4b739ad --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java @@ -0,0 +1,654 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; +import static org.apache.hudi.util.ExpressionUtils.getValueFromLiteral; +import static org.apache.parquet.filter2.predicate.FilterApi.and; +import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.eq; +import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.gt; +import static org.apache.parquet.filter2.predicate.FilterApi.gtEq; +import static org.apache.parquet.filter2.predicate.FilterApi.intColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.longColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.lt; +import static org.apache.parquet.filter2.predicate.FilterApi.ltEq; +import static org.apache.parquet.filter2.predicate.FilterApi.not; +import static org.apache.parquet.filter2.predicate.FilterApi.notEq; +import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.io.api.Binary.fromConstantByteArray; +import static org.apache.parquet.io.api.Binary.fromString; + +/** + * Tool to predicate the {@link org.apache.flink.table.expressions.ResolvedExpression}s. + */ +public class ExpressionPredicates { + + private static final Logger LOG = LoggerFactory.getLogger(ExpressionPredicates.class); + + /** + * Converts specific call expression list to the predicate list. + * + * @param resolvedExpressions The resolved expressions to convert. + * @return The converted predicates. + */ + public static List<Predicate> fromExpression(List<ResolvedExpression> resolvedExpressions) { + return resolvedExpressions.stream() + .map(e -> fromExpression((CallExpression) e)) + .collect(Collectors.toList()); + } + + /** + * Converts specific call expression to the predicate. + * + * <p>Two steps to bind the call: + * 1. map the predicate instance; + * 2. bind the field reference; + * + * <p>Normalize the expression to simplify the subsequent decision logic: + * always put the literal expression in the RHS. + * + * @param callExpression The call expression to convert. + * @return The converted predicate. + */ + public static Predicate fromExpression(CallExpression callExpression) { + FunctionDefinition functionDefinition = callExpression.getFunctionDefinition(); + List<Expression> childExpressions = callExpression.getChildren(); + + boolean normalized = childExpressions.get(0) instanceof FieldReferenceExpression; + + if (BuiltInFunctionDefinitions.NOT.equals(functionDefinition)) { + Not predicate = Not.getInstance(); + Predicate childPredicate = fromExpression((CallExpression) childExpressions.get(0)); + return predicate.bindPredicate(childPredicate); + } + + if (BuiltInFunctionDefinitions.AND.equals(functionDefinition)) { + And predicate = And.getInstance(); + Predicate predicate1 = fromExpression((CallExpression) childExpressions.get(0)); + Predicate predicate2 = fromExpression((CallExpression) childExpressions.get(1)); + return predicate.bindPredicates(predicate1, predicate2); + } + + if (BuiltInFunctionDefinitions.OR.equals(functionDefinition)) { + Or predicate = Or.getInstance(); + Predicate predicate1 = fromExpression((CallExpression) childExpressions.get(0)); + Predicate predicate2 = fromExpression((CallExpression) childExpressions.get(1)); + return predicate.bindPredicates(predicate1, predicate2); + } + + if (BuiltInFunctionDefinitions.IS_NULL.equals(functionDefinition) + || BuiltInFunctionDefinitions.IS_NOT_NULL.equals(functionDefinition) + || childExpressions.stream().anyMatch(e -> e instanceof ValueLiteralExpression + && getValueFromLiteral((ValueLiteralExpression) e) == null)) { + return AlwaysNull.getInstance(); + } + + // handle IN specifically + if (BuiltInFunctionDefinitions.IN.equals(functionDefinition)) { + checkState(normalized, "The IN expression expects to be normalized"); + In in = In.getInstance(); + FieldReferenceExpression fieldReference = (FieldReferenceExpression) childExpressions.get(0); + List<ValueLiteralExpression> valueLiterals = IntStream.range(1, childExpressions.size()) + .mapToObj(index -> (ValueLiteralExpression) childExpressions.get(index)) + .collect(Collectors.toList()); + return in.bindValueLiterals(valueLiterals).bindFieldReference(fieldReference); + } + + ColumnPredicate predicate; + // handle binary operators + if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) { + predicate = Equals.getInstance(); + } else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) { + predicate = NotEquals.getInstance(); + } else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) { + predicate = normalized ? LessThan.getInstance() : GreaterThan.getInstance(); + } else if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) { + predicate = normalized ? GreaterThan.getInstance() : LessThan.getInstance(); + } else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) { + predicate = normalized ? LessThanOrEqual.getInstance() : GreaterThanOrEqual.getInstance(); + } else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) { + predicate = normalized ? GreaterThanOrEqual.getInstance() : LessThanOrEqual.getInstance(); + } else { + throw new AssertionError("Unexpected function definition " + functionDefinition); + } + FieldReferenceExpression fieldReference = normalized + ? (FieldReferenceExpression) childExpressions.get(0) + : (FieldReferenceExpression) childExpressions.get(1); + ValueLiteralExpression valueLiteral = normalized + ? (ValueLiteralExpression) childExpressions.get(1) + : (ValueLiteralExpression) childExpressions.get(0); + return predicate.bindValueLiteral(valueLiteral).bindFieldReference(fieldReference); + } + + // -------------------------------------------------------------------------------------------- + // Classes to define predicates + // -------------------------------------------------------------------------------------------- + + /** + * A filter predicate that can be evaluated by the FileInputFormat. + */ + public interface Predicate extends Serializable { + + /** + * Predicates the criteria for which records to keep when loading data from a parquet file. + * + * @return A filter predicate of parquet file. + */ + FilterPredicate filter(); + } + + /** + * Column predicate which depends on the given field. + */ + public abstract static class ColumnPredicate implements Predicate { + + // referenced field type + protected LogicalType literalType; + + // referenced field name + protected String columnName; + + // the constant literal value + protected Serializable literal; + + /** + * Binds field reference to create a column predicate. + * + * @param fieldReference The field reference to negate. + * @return A column predicate. + */ + public ColumnPredicate bindFieldReference(FieldReferenceExpression fieldReference) { + this.literalType = fieldReference.getOutputDataType().getLogicalType(); + this.columnName = fieldReference.getName(); + return this; + } + + /** + * Binds value literal to create a column predicate. + * + * @param valueLiteral The value literal to negate. + * @return A column predicate. + */ + public ColumnPredicate bindValueLiteral(ValueLiteralExpression valueLiteral) { + Object literalObject = getValueFromLiteral(valueLiteral); + // validate that literal is serializable + if (literalObject instanceof Serializable) { + this.literal = (Serializable) literalObject; + } else { + LOG.warn("Encountered a non-serializable literal. " + "Cannot push predicate with value literal [{}] into FileInputFormat. " + "This is a bug and should be reported.", valueLiteral); + this.literal = null; + } + return this; + } + + @Override + public FilterPredicate filter() { + return toParquetPredicate(getFunctionDefinition(), literalType, columnName, literal); + } + + /** + * Returns function definition of predicate. + * + * @return A function definition of predicate. + */ + public FunctionDefinition getFunctionDefinition() { + return null; + } + } + + /** + * An EQUALS predicate that can be evaluated by the FileInputFormat. + */ + public static class Equals extends ColumnPredicate { + + private static final long serialVersionUID = 1L; + + /** + * Returns a EQUALS predicate. + * + * @return A EQUALS predicate instance. + */ + public static Equals getInstance() { + return new Equals(); + } + + @Override + public FunctionDefinition getFunctionDefinition() { + return BuiltInFunctionDefinitions.EQUALS; + } + + @Override + public String toString() { + return columnName + " = " + literal; + } + } + + /** + * A NOT_EQUALS predicate that can be evaluated by the FileInputFormat. + */ + public static class NotEquals extends ColumnPredicate { + + private static final long serialVersionUID = 1L; + + /** + * Returns a NOT_EQUALS predicate. + * + * @return A NOT_EQUALS predicate instance. + */ + public static NotEquals getInstance() { + return new NotEquals(); + } + + @Override + public FunctionDefinition getFunctionDefinition() { + return BuiltInFunctionDefinitions.NOT_EQUALS; + } + + @Override + public String toString() { + return columnName + " != " + literal; + } + } + + /** + * A LESS_THAN predicate that can be evaluated by the FileInputFormat. + */ + public static class LessThan extends ColumnPredicate { + + private static final long serialVersionUID = 1L; + + /** + * Returns a LESS_THAN predicate. + * + * @return A LESS_THAN predicate instance. + */ + public static LessThan getInstance() { + return new LessThan(); + } + + @Override + public FunctionDefinition getFunctionDefinition() { + return BuiltInFunctionDefinitions.LESS_THAN; + } + + @Override + public String toString() { + return columnName + " < " + literal; + } + } + + /** + * A GREATER_THAN predicate that can be evaluated by the FileInputFormat. + */ + public static class GreaterThan extends ColumnPredicate { + + private static final long serialVersionUID = 1L; + + /** + * Returns a GREATER_THAN predicate. + * + * @return A GREATER_THAN predicate instance. + */ + public static GreaterThan getInstance() { + return new GreaterThan(); + } + + @Override + public FunctionDefinition getFunctionDefinition() { + return BuiltInFunctionDefinitions.GREATER_THAN; + } + + @Override + public String toString() { + return columnName + " > " + literal; + } + } + + /** + * A LESS_THAN_OR_EQUAL predicate that can be evaluated by the FileInputFormat. + */ + public static class LessThanOrEqual extends ColumnPredicate { + + private static final long serialVersionUID = 1L; + + /** + * Returns a LESS_THAN_OR_EQUAL predicate. + * + * @return A LESS_THAN_OR_EQUAL predicate instance. + */ + public static LessThanOrEqual getInstance() { + return new LessThanOrEqual(); + } + + @Override + public FunctionDefinition getFunctionDefinition() { + return BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL; + } + + @Override + public String toString() { + return columnName + " <= " + literal; + } + } + + /** + * A GREATER_THAN_OR_EQUAL predicate that can be evaluated by the FileInputFormat. + */ + public static class GreaterThanOrEqual extends ColumnPredicate { + + private static final long serialVersionUID = 1L; + + /** + * Returns a GREATER_THAN_OR_EQUAL predicate. + * + * @return A GREATER_THAN_OR_EQUAL predicate instance. + */ + public static GreaterThanOrEqual getInstance() { + return new GreaterThanOrEqual(); + } + + @Override + public FunctionDefinition getFunctionDefinition() { + return BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL; + } + + @Override + public String toString() { + return columnName + " >= " + literal; + } + } + + /** + * An IN predicate that can be evaluated by the FileInputFormat. + */ + public static class In extends ColumnPredicate { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ExpressionEvaluators.In.class); + + + private static final int IN_PREDICATE_LIMIT = 200; + + // the constant literal values + protected List<Serializable> literals; + + /** + * Returns an IN predicate. + * + * @return An IN predicate instance. + */ + public static In getInstance() { + return new In(); + } + + /** + * Binds value literals to create an IN predicate. + * + * @param valueLiterals The value literals to negate. + * @return An IN predicate. + */ + public ColumnPredicate bindValueLiterals(List<ValueLiteralExpression> valueLiterals) { + this.literals = valueLiterals.stream().map(valueLiteral -> { + Object literalObject = getValueFromLiteral(valueLiteral); + // validate that literal is serializable + if (literalObject instanceof Serializable) { + return (Serializable) literalObject; + } else { + LOG.warn("Encountered a non-serializable literal. " + "Cannot push predicate with value literal [{}] into FileInputFormat. " + "This is a bug and should be reported.", valueLiteral); + return null; + } + }).collect(Collectors.toList()); + return this; + } + + @Override + public FilterPredicate filter() { + if (literals.stream().anyMatch(Objects::isNull) || literals.size() > IN_PREDICATE_LIMIT) { + return null; + } + + FilterPredicate filterPredicate = null; + for (Serializable literal : literals) { + FilterPredicate predicate = toParquetPredicate(BuiltInFunctionDefinitions.EQUALS, literalType, columnName, literal); + if (predicate != null) { + filterPredicate = filterPredicate == null ? predicate : or(filterPredicate, predicate); + } + } + return filterPredicate; + } + + @Override + public String toString() { + return columnName + " IN(" + Arrays.toString(literals.toArray()) + ")"; + } + } + + /** + * A special predicate which is not possible to match any condition. + */ + public static class AlwaysNull implements Predicate { + + private static final long serialVersionUID = 1L; + + public static AlwaysNull getInstance() { + return new AlwaysNull(); + } + + @Override + public FilterPredicate filter() { + return null; + } + } + + /** + * A NOT predicate to negate a predicate that can be evaluated by the FileInputFormat. + */ + public static class Not implements Predicate { + + private static final long serialVersionUID = 1L; + + private Predicate predicate; + + /** + * Returns a NOT predicate. + */ + public static Not getInstance() { + return new Not(); + } + + /** + * Binds predicate to create a NOT predicate. + * + * @param predicate The predicate to negate. + * @return A NOT predicate. + */ + public Predicate bindPredicate(Predicate predicate) { + this.predicate = predicate; + return this; + } + + @Override + public FilterPredicate filter() { + return not(predicate.filter()); + } + + @Override + public String toString() { + return "NOT(" + predicate.toString() + ")"; + } + } + + /** + * An AND predicate that can be evaluated by the FileInputFormat. + */ + public static class And implements Predicate { + + private static final long serialVersionUID = 1L; + + private Predicate[] predicates; + + /** + * Returns an AND predicate. + */ + public static And getInstance() { + return new And(); + } + + /** + * Binds predicates to create an AND predicate. + * + * @param predicates The disjunctive predicates. + * @return An AND predicate. + */ + public Predicate bindPredicates(Predicate... predicates) { + this.predicates = predicates; + return this; + } + + @Override + public FilterPredicate filter() { + return and(predicates[0].filter(), predicates[1].filter()); + } + + @Override + public String toString() { + return "AND(" + Arrays.toString(predicates) + ")"; + } + } + + /** + * An OR predicate that can be evaluated by the FileInputFormat. + */ + public static class Or implements Predicate { + + private static final long serialVersionUID = 1L; + + private Predicate[] predicates; + + /** + * Returns an OR predicate. + */ + public static Or getInstance() { + return new Or(); + } + + /** + * Binds predicates to create an OR predicate. + * + * @param predicates The disjunctive predicates. + * @return An OR predicate. + */ + public Predicate bindPredicates(Predicate... predicates) { + this.predicates = predicates; + return this; + } + + @Override + public FilterPredicate filter() { + return or(predicates[0].filter(), predicates[1].filter()); + } + + @Override + public String toString() { + return "OR(" + Arrays.toString(predicates) + ")"; + } + } + + private static FilterPredicate toParquetPredicate(FunctionDefinition functionDefinition, LogicalType literalType, String columnName, Serializable literal) { + switch (literalType.getTypeRoot()) { + case BOOLEAN: + return predicateSupportsEqNotEq(functionDefinition, booleanColumn(columnName), (Boolean) literal); + case TINYINT: + case SMALLINT: + case INTEGER: + case TIME_WITHOUT_TIME_ZONE: + return predicateSupportsLtGt(functionDefinition, intColumn(columnName), (Integer) literal); + case BIGINT: + case DATE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return predicateSupportsLtGt(functionDefinition, longColumn(columnName), (Long) literal); + case FLOAT: + return predicateSupportsLtGt(functionDefinition, floatColumn(columnName), (Float) literal); + case DOUBLE: + return predicateSupportsLtGt(functionDefinition, doubleColumn(columnName), (Double) literal); + case BINARY: + case VARBINARY: + return predicateSupportsLtGt(functionDefinition, binaryColumn(columnName), fromConstantByteArray((byte[]) literal)); + case CHAR: + case VARCHAR: + return predicateSupportsLtGt(functionDefinition, binaryColumn(columnName), fromString((String) literal)); + default: + return null; + } + } + + private static <T extends Comparable<T>, C extends Operators.Column<T> & Operators.SupportsEqNotEq> FilterPredicate predicateSupportsEqNotEq( + FunctionDefinition functionDefinition, C column, T value) { + if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) { + return eq(column, value); + } else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) { + return notEq(column, value); + } else { + throw new AssertionError("Unexpected function definition " + functionDefinition); + } + } + + private static <T extends Comparable<T>, C extends Operators.Column<T> & Operators.SupportsLtGt> FilterPredicate predicateSupportsLtGt(FunctionDefinition functionDefinition, C column, T value) { + if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) { + return eq(column, value); + } else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) { + return notEq(column, value); + } else if (BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) { + return lt(column, value); + } else if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) { + return gt(column, value); + } else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) { + return ltEq(column, value); + } else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) { + return gtEq(column, value); + } else { + throw new AssertionError("Unexpected function definition " + functionDefinition); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 540f1a8c79d..03eb3205e8c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -40,6 +40,8 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.source.ExpressionEvaluators; +import org.apache.hudi.source.ExpressionPredicates; +import org.apache.hudi.source.ExpressionPredicates.Predicate; import org.apache.hudi.source.FileIndex; import org.apache.hudi.source.IncrementalInputSplits; import org.apache.hudi.source.StreamReadMonitoringFunction; @@ -134,6 +136,7 @@ public class HoodieTableSource implements private int[] requiredPos; private long limit; + private List<Predicate> predicates; private DataPruner dataPruner; private PartitionPruners.PartitionPruner partitionPruner; private int dataBucket; @@ -145,7 +148,7 @@ public class HoodieTableSource implements List<String> partitionKeys, String defaultPartName, Configuration conf) { - this(schema, path, partitionKeys, defaultPartName, conf, null, null, PrimaryKeyPruners.BUCKET_ID_NO_PRUNING, null, null, null, null); + this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, PrimaryKeyPruners.BUCKET_ID_NO_PRUNING, null, null, null, null); } public HoodieTableSource( @@ -154,6 +157,7 @@ public class HoodieTableSource implements List<String> partitionKeys, String defaultPartName, Configuration conf, + @Nullable List<Predicate> predicates, @Nullable DataPruner dataPruner, @Nullable PartitionPruners.PartitionPruner partitionPruner, int dataBucket, @@ -167,6 +171,7 @@ public class HoodieTableSource implements this.partitionKeys = partitionKeys; this.defaultPartName = defaultPartName; this.conf = conf; + this.predicates = predicates == null ? Collections.emptyList() : predicates; this.dataPruner = dataPruner; this.partitionPruner = partitionPruner; this.dataBucket = dataBucket; @@ -230,7 +235,7 @@ public class HoodieTableSource implements @Override public DynamicTableSource copy() { return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, - conf, dataPruner, partitionPruner, dataBucket, requiredPos, limit, metaClient, internalSchemaManager); + conf, predicates, dataPruner, partitionPruner, dataBucket, requiredPos, limit, metaClient, internalSchemaManager); } @Override @@ -242,6 +247,7 @@ public class HoodieTableSource implements public Result applyFilters(List<ResolvedExpression> filters) { List<ResolvedExpression> simpleFilters = filterSimpleCallExpression(filters); Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitFilters = splitExprByPartitionCall(simpleFilters, this.partitionKeys, this.tableRowType); + this.predicates = ExpressionPredicates.fromExpression(splitFilters.f0); this.dataPruner = DataPruner.newInstance(splitFilters.f0); this.partitionPruner = cratePartitionPruner(splitFilters.f1); this.dataBucket = getDataBucket(splitFilters.f0); @@ -474,6 +480,7 @@ public class HoodieTableSource implements // is not very stable. .fieldTypes(rowDataType.getChildren()) .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) + .predicates(this.predicates) .limit(this.limit) .emitDelete(false) // the change logs iterator can handle the DELETE records .build(); @@ -500,6 +507,7 @@ public class HoodieTableSource implements // is not very stable. .fieldTypes(rowDataType.getChildren()) .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) + .predicates(this.predicates) .limit(this.limit) .emitDelete(emitDelete) .internalSchemaManager(internalSchemaManager) @@ -530,6 +538,7 @@ public class HoodieTableSource implements this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD), this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), + this.predicates, this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value getParquetConf(this.conf, this.hadoopConf), this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), @@ -600,6 +609,11 @@ public class HoodieTableSource implements return fileIndex.getFilesInPartitions(); } + @VisibleForTesting + public List<Predicate> getPredicates() { + return predicates; + } + @VisibleForTesting public DataPruner getDataPruner() { return dataPruner; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java index b6be67df55a..711ed446713 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java @@ -21,17 +21,29 @@ package org.apache.hudi.table.format; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.source.ExpressionPredicates.Predicate; import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; import org.apache.hudi.util.RowDataProjection; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.filter.UnboundRecordFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.BadConfigurationException; +import org.apache.parquet.hadoop.util.ConfigurationUtil; +import org.apache.parquet.hadoop.util.SerializationUtil; import java.io.IOException; +import java.util.List; import java.util.Map; +import static org.apache.parquet.filter2.predicate.FilterApi.and; +import static org.apache.parquet.hadoop.ParquetInputFormat.FILTER_PREDICATE; +import static org.apache.parquet.hadoop.ParquetInputFormat.UNBOUND_RECORD_FILTER; + /** * Factory clazz for record iterators. */ @@ -49,7 +61,17 @@ public abstract class RecordIterators { int batchSize, Path path, long splitStart, - long splitLength) throws IOException { + long splitLength, + List<Predicate> predicates) throws IOException { + FilterPredicate filterPredicate = getFilterPredicate(conf); + for (Predicate predicate : predicates) { + FilterPredicate filter = predicate.filter(); + if (filter != null) { + filterPredicate = filterPredicate == null ? filter : and(filterPredicate, filter); + } + } + UnboundRecordFilter recordFilter = getUnboundRecordFilterInstance(conf); + InternalSchema mergeSchema = internalSchemaManager.getMergeSchema(path.getName()); if (mergeSchema.isEmptySchema()) { return new ParquetSplitRecordIterator( @@ -64,7 +86,9 @@ public abstract class RecordIterators { batchSize, path, splitStart, - splitLength)); + splitLength, + filterPredicate, + recordFilter)); } else { CastMap castMap = internalSchemaManager.getCastMap(mergeSchema, fieldNames, fieldTypes, selectedFields); Option<RowDataProjection> castProjection = castMap.toRowDataProjection(selectedFields); @@ -80,7 +104,9 @@ public abstract class RecordIterators { batchSize, path, splitStart, - splitLength)); + splitLength, + filterPredicate, + recordFilter)); if (castProjection.isPresent()) { return new SchemaEvolvedRecordIterator(itr, castProjection.get()); } else { @@ -88,4 +114,32 @@ public abstract class RecordIterators { } } } + + private static FilterPredicate getFilterPredicate(Configuration configuration) { + try { + return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE, configuration); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static UnboundRecordFilter getUnboundRecordFilterInstance(Configuration configuration) { + Class<?> clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class); + if (clazz == null) { + return null; + } + + try { + UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter) clazz.newInstance(); + + if (unboundRecordFilter instanceof Configurable) { + ((Configurable) unboundRecordFilter).setConf(configuration); + } + + return unboundRecordFilter; + } catch (InstantiationException | IllegalAccessException e) { + throw new BadConfigurationException( + "could not instantiate unbound record filter class", e); + } + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java index 124f8482b6f..154df81a0d4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java @@ -38,6 +38,7 @@ import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.source.ExpressionPredicates.Predicate; import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.table.format.InternalSchemaManager; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; @@ -88,9 +89,10 @@ public class CdcInputFormat extends MergeOnReadInputFormat { MergeOnReadTableState tableState, List<DataType> fieldTypes, String defaultPartName, + List<Predicate> predicates, long limit, boolean emitDelete) { - super(conf, tableState, fieldTypes, defaultPartName, limit, emitDelete, InternalSchemaManager.DISABLED); + super(conf, tableState, fieldTypes, defaultPartName, predicates, limit, emitDelete, InternalSchemaManager.DISABLED); } @Override @@ -701,6 +703,11 @@ public class CdcInputFormat extends MergeOnReadInputFormat { return this; } + public Builder predicates(List<Predicate> predicates) { + this.predicates = predicates; + return this; + } + public Builder limit(long limit) { this.limit = limit; return this; @@ -713,7 +720,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat { public CdcInputFormat build() { return new CdcInputFormat(conf, tableState, fieldTypes, - defaultPartName, limit, emitDelete); + defaultPartName, predicates, limit, emitDelete); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java index ec9b0b02a7b..5b365a58990 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java @@ -18,9 +18,9 @@ package org.apache.hudi.table.format.cow; -import java.util.Comparator; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.source.ExpressionPredicates.Predicate; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.InternalSchemaManager; import org.apache.hudi.table.format.RecordIterators; @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -75,6 +76,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { private final boolean hiveStylePartitioning; private final boolean utcTimestamp; private final SerializableConfiguration conf; + private final List<Predicate> predicates; private final long limit; private transient ClosableIterator<RowData> itr; @@ -95,11 +97,13 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { String partDefaultName, String partPathField, boolean hiveStylePartitioning, + List<Predicate> predicates, long limit, Configuration conf, boolean utcTimestamp, InternalSchemaManager internalSchemaManager) { super.setFilePaths(paths); + this.predicates = predicates; this.limit = limit; this.partDefaultName = partDefaultName; this.partPathField = partPathField; @@ -135,7 +139,8 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { 2048, fileSplit.getPath(), fileSplit.getStart(), - fileSplit.getLength()); + fileSplit.getLength(), + predicates); this.currentReadCount = 0L; } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 23a3934aeb9..f13098fc7c7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -38,6 +38,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.source.ExpressionPredicates.Predicate; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.table.format.InternalSchemaManager; @@ -123,6 +124,9 @@ public class MergeOnReadInputFormat */ private final int[] requiredPos; + // for predicate push down + private final List<Predicate> predicates; + // for limit push down /** * Limit for the reader, -1 when the reading is not limited. @@ -152,6 +156,7 @@ public class MergeOnReadInputFormat MergeOnReadTableState tableState, List<DataType> fieldTypes, String defaultPartName, + List<Predicate> predicates, long limit, boolean emitDelete, InternalSchemaManager internalSchemaManager) { @@ -163,6 +168,7 @@ public class MergeOnReadInputFormat // Needs improvement: this requiredPos is only suitable for parquet reader, // because we need to this.requiredPos = tableState.getRequiredPositions(); + this.predicates = predicates; this.limit = limit; this.emitDelete = emitDelete; this.internalSchemaManager = internalSchemaManager; @@ -336,7 +342,8 @@ public class MergeOnReadInputFormat 2048, new org.apache.flink.core.fs.Path(path), 0, - Long.MAX_VALUE); // read the whole file + Long.MAX_VALUE, // read the whole file + predicates); } private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) { @@ -845,6 +852,7 @@ public class MergeOnReadInputFormat protected MergeOnReadTableState tableState; protected List<DataType> fieldTypes; protected String defaultPartName; + protected List<Predicate> predicates; protected long limit = -1; protected boolean emitDelete = false; protected InternalSchemaManager internalSchemaManager = InternalSchemaManager.DISABLED; @@ -869,6 +877,11 @@ public class MergeOnReadInputFormat return this; } + public Builder predicates(List<Predicate> predicates) { + this.predicates = predicates; + return this; + } + public Builder limit(long limit) { this.limit = limit; return this; @@ -886,7 +899,7 @@ public class MergeOnReadInputFormat public MergeOnReadInputFormat build() { return new MergeOnReadInputFormat(conf, tableState, fieldTypes, - defaultPartName, limit, emitDelete, internalSchemaManager); + defaultPartName, predicates, limit, emitDelete, internalSchemaManager); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java new file mode 100644 index 00000000000..97b06644266 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.source.ExpressionPredicates.And; +import org.apache.hudi.source.ExpressionPredicates.Equals; +import org.apache.hudi.source.ExpressionPredicates.GreaterThan; +import org.apache.hudi.source.ExpressionPredicates.GreaterThanOrEqual; +import org.apache.hudi.source.ExpressionPredicates.In; +import org.apache.hudi.source.ExpressionPredicates.LessThan; +import org.apache.hudi.source.ExpressionPredicates.LessThanOrEqual; +import org.apache.hudi.source.ExpressionPredicates.Not; +import org.apache.hudi.source.ExpressionPredicates.NotEquals; +import org.apache.hudi.source.ExpressionPredicates.Or; +import org.apache.hudi.source.ExpressionPredicates.Predicate; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.parquet.filter2.predicate.Operators.Eq; +import org.apache.parquet.filter2.predicate.Operators.Gt; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.Lt; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.hudi.source.ExpressionPredicates.fromExpression; +import static org.apache.parquet.filter2.predicate.FilterApi.and; +import static org.apache.parquet.filter2.predicate.FilterApi.eq; +import static org.apache.parquet.filter2.predicate.FilterApi.gt; +import static org.apache.parquet.filter2.predicate.FilterApi.gtEq; +import static org.apache.parquet.filter2.predicate.FilterApi.intColumn; +import static org.apache.parquet.filter2.predicate.FilterApi.lt; +import static org.apache.parquet.filter2.predicate.FilterApi.ltEq; +import static org.apache.parquet.filter2.predicate.FilterApi.not; +import static org.apache.parquet.filter2.predicate.FilterApi.notEq; +import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test cases for {@link ExpressionPredicates}. + */ +public class TestExpressionPredicates { + + @Test + public void testFilterPredicateFromExpression() { + FieldReferenceExpression fieldReference = new FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0); + ValueLiteralExpression valueLiteral = new ValueLiteralExpression(10); + List<ResolvedExpression> expressions = Arrays.asList(fieldReference, valueLiteral); + IntColumn intColumn = intColumn("f_int"); + + // equals + CallExpression equalsExpression = new CallExpression( + BuiltInFunctionDefinitions.EQUALS, expressions, DataTypes.BOOLEAN()); + Predicate predicate1 = Equals.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference); + Eq<Integer> eq = eq(intColumn, 10); + Predicate predicate2 = fromExpression(equalsExpression); + assertEquals(predicate1.toString(), predicate2.toString()); + assertEquals(eq, predicate2.filter()); + + // not equals + CallExpression notEqualsExpression = new CallExpression( + BuiltInFunctionDefinitions.NOT_EQUALS, expressions, DataTypes.BOOLEAN()); + Predicate predicate3 = NotEquals.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference); + Predicate predicate4 = fromExpression(notEqualsExpression); + assertEquals(predicate3.toString(), predicate4.toString()); + assertEquals(notEq(intColumn, 10), predicate4.filter()); + + // less than + CallExpression lessThanExpression = new CallExpression( + BuiltInFunctionDefinitions.LESS_THAN, expressions, DataTypes.BOOLEAN()); + Predicate predicate5 = LessThan.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference); + Lt<Integer> lt = lt(intColumn, 10); + Predicate predicate6 = fromExpression(lessThanExpression); + assertEquals(predicate5.toString(), predicate6.toString()); + assertEquals(lt, predicate6.filter()); + + // greater than + CallExpression greaterThanExpression = new CallExpression( + BuiltInFunctionDefinitions.GREATER_THAN, expressions, DataTypes.BOOLEAN()); + Predicate predicate7 = GreaterThan.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference); + Gt<Integer> gt = gt(intColumn, 10); + Predicate predicate8 = fromExpression(greaterThanExpression); + assertEquals(predicate7.toString(), predicate8.toString()); + assertEquals(gt, predicate8.filter()); + + // less than or equal + CallExpression lessThanOrEqualExpression = new CallExpression( + BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, expressions, DataTypes.BOOLEAN()); + Predicate predicate9 = LessThanOrEqual.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference); + Predicate predicate10 = fromExpression(lessThanOrEqualExpression); + assertEquals(predicate9.toString(), predicate10.toString()); + assertEquals(ltEq(intColumn, 10), predicate10.filter()); + + // greater than or equal + CallExpression greaterThanOrEqualExpression = new CallExpression( + BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, expressions, DataTypes.BOOLEAN()); + Predicate predicate11 = GreaterThanOrEqual.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference); + Predicate predicate12 = fromExpression(greaterThanOrEqualExpression); + assertEquals(predicate11.toString(), predicate12.toString()); + assertEquals(gtEq(intColumn, 10), predicate12.filter()); + + // in + ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(11); + ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression(12); + CallExpression inExpression = new CallExpression( + BuiltInFunctionDefinitions.IN, + Arrays.asList(fieldReference, valueLiteral1, valueLiteral2), + DataTypes.BOOLEAN()); + Predicate predicate13 = In.getInstance().bindValueLiterals(Arrays.asList(valueLiteral1, valueLiteral2)).bindFieldReference(fieldReference); + Predicate predicate14 = fromExpression(inExpression); + assertEquals(predicate13.toString(), predicate14.toString()); + assertEquals(or(eq(intColumn, 11), eq(intColumn, 12)), predicate14.filter()); + + // not + CallExpression notExpression = new CallExpression( + BuiltInFunctionDefinitions.NOT, + Collections.singletonList(equalsExpression), + DataTypes.BOOLEAN()); + Predicate predicate15 = Not.getInstance().bindPredicate(predicate2); + Predicate predicate16 = fromExpression(notExpression); + assertEquals(predicate15.toString(), predicate16.toString()); + assertEquals(not(eq), predicate16.filter()); + + // and + CallExpression andExpression = new CallExpression( + BuiltInFunctionDefinitions.AND, + Arrays.asList(lessThanExpression, greaterThanExpression), + DataTypes.BOOLEAN()); + Predicate predicate17 = And.getInstance().bindPredicates(predicate6, predicate8); + Predicate predicate18 = fromExpression(andExpression); + assertEquals(predicate17.toString(), predicate18.toString()); + assertEquals(and(lt, gt), predicate18.filter()); + + // or + CallExpression orExpression = new CallExpression( + BuiltInFunctionDefinitions.OR, + Arrays.asList(lessThanExpression, greaterThanExpression), + DataTypes.BOOLEAN()); + Predicate predicate19 = Or.getInstance().bindPredicates(predicate6, predicate8); + Predicate predicate20 = fromExpression(orExpression); + assertEquals(predicate19.toString(), predicate20.toString()); + assertEquals(or(lt, gt), predicate20.filter()); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 4ea92fbb845..40fb28619de 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -2036,6 +2036,20 @@ public class ITTestHoodieDataSource { assertRowsEquals(result4, expected4); } + @Test + void testReadWithParquetPredicatePushDown() { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = sql("t1").option(FlinkOptions.PATH, tempFile.getAbsolutePath()).end(); + tableEnv.executeSql(hoodieTableDDL); + execInsertSql(tableEnv, TestSQL.INSERT_T1); + // apply filters to push down predicates + List<Row> result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1 where uuid > 'id2' and age > 30 and ts > '1970-01-01 00:00:04'").execute().collect()); + assertRowsEquals(result, "[" + + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index 2716dee2b1b..d0201620219 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -20,6 +20,7 @@ package org.apache.hudi.table; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.source.ExpressionPredicates; import org.apache.hudi.source.prune.DataPruner; import org.apache.hudi.source.prune.PrimaryKeyPruners; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; @@ -55,6 +56,7 @@ import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -65,6 +67,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -291,6 +294,26 @@ public class TestHoodieTableSource { assertThat(metaClient, is(tableSourceCopy.getMetaClient())); } + @Test + void testFilterPushDownWithParquetPredicates() { + HoodieTableSource tableSource = getEmptyStreamingSource(); + List<ResolvedExpression> expressions = new ArrayList<>(); + expressions.add(new FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0)); + expressions.add(new ValueLiteralExpression(10)); + ResolvedExpression equalsExpression = new CallExpression( + BuiltInFunctionDefinitions.EQUALS, expressions, DataTypes.BOOLEAN()); + CallExpression greaterThanExpression = new CallExpression( + BuiltInFunctionDefinitions.GREATER_THAN, expressions, DataTypes.BOOLEAN()); + CallExpression orExpression = new CallExpression( + BuiltInFunctionDefinitions.OR, + Arrays.asList(equalsExpression, greaterThanExpression), + DataTypes.BOOLEAN()); + List<ResolvedExpression> expectedFilters = Arrays.asList(equalsExpression, greaterThanExpression, orExpression); + tableSource.applyFilters(expectedFilters); + String actualPredicates = tableSource.getPredicates().toString(); + assertEquals(ExpressionPredicates.fromExpression(expectedFilters).toString(), actualPredicates); + } + private HoodieTableSource getEmptyStreamingSource() { final String path = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(path); diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index 4a9675d746a..622f499b64b 100644 --- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.filter.UnboundRecordFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.OriginalType; @@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil { int batchSize, Path path, long splitStart, - long splitLength) throws IOException { + long splitLength, + FilterPredicate filterPredicate, + UnboundRecordFilter recordFilter) throws IOException { List<String> selNonPartNames = Arrays.stream(selectedFields) .mapToObj(i -> fullFieldNames[i]) .filter(n -> !partitionSpec.containsKey(n)) @@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil { batchSize, new org.apache.hadoop.fs.Path(path.toUri()), splitStart, - splitLength); + splitLength, + filterPredicate, + recordFilter); } private static ColumnVector createVector( diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java index 6922ada9acf..9436305d295 100644 --- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java +++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java @@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -53,10 +55,10 @@ import java.util.stream.IntStream; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector; +import static org.apache.parquet.filter2.compat.FilterCompat.get; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; -import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; /** * This reader is used to read a {@link VectorizedColumnBatch} from input split. @@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements Closeable { int batchSize, Path path, long splitStart, - long splitLength) throws IOException { + long splitLength, + FilterPredicate filterPredicate, + UnboundRecordFilter recordFilter) throws IOException { this.utcTimestamp = utcTimestamp; this.batchSize = batchSize; // then we need to apply the predicate push down filter ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength)); MessageType fileSchema = footer.getFileMetaData().getSchema(); - FilterCompat.Filter filter = getFilter(conf); + FilterCompat.Filter filter = get(filterPredicate, recordFilter); List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); this.fileSchema = footer.getFileMetaData().getSchema(); diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index a7bd063c746..7e611a5e2cb 100644 --- a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.filter.UnboundRecordFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.OriginalType; @@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil { int batchSize, Path path, long splitStart, - long splitLength) throws IOException { + long splitLength, + FilterPredicate filterPredicate, + UnboundRecordFilter recordFilter) throws IOException { List<String> selNonPartNames = Arrays.stream(selectedFields) .mapToObj(i -> fullFieldNames[i]) .filter(n -> !partitionSpec.containsKey(n)) @@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil { batchSize, new org.apache.hadoop.fs.Path(path.toUri()), splitStart, - splitLength); + splitLength, + filterPredicate, + recordFilter); } private static ColumnVector createVector( diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java index 1826d5bea4c..4eb91988403 100644 --- a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java +++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java @@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -53,10 +55,10 @@ import java.util.stream.IntStream; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector; +import static org.apache.parquet.filter2.compat.FilterCompat.get; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; -import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; /** * This reader is used to read a {@link VectorizedColumnBatch} from input split. @@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements Closeable { int batchSize, Path path, long splitStart, - long splitLength) throws IOException { + long splitLength, + FilterPredicate filterPredicate, + UnboundRecordFilter recordFilter) throws IOException { this.utcTimestamp = utcTimestamp; this.batchSize = batchSize; // then we need to apply the predicate push down filter ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength)); MessageType fileSchema = footer.getFileMetaData().getSchema(); - FilterCompat.Filter filter = getFilter(conf); + FilterCompat.Filter filter = get(filterPredicate, recordFilter); List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); this.fileSchema = footer.getFileMetaData().getSchema(); diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index e10f975bc29..3071ecc122d 100644 --- a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.filter.UnboundRecordFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.OriginalType; @@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil { int batchSize, Path path, long splitStart, - long splitLength) throws IOException { + long splitLength, + FilterPredicate filterPredicate, + UnboundRecordFilter recordFilter) throws IOException { List<String> selNonPartNames = Arrays.stream(selectedFields) .mapToObj(i -> fullFieldNames[i]) .filter(n -> !partitionSpec.containsKey(n)) @@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil { batchSize, new org.apache.hadoop.fs.Path(path.toUri()), splitStart, - splitLength); + splitLength, + filterPredicate, + recordFilter); } private static ColumnVector createVector( diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java index 1872ec385b4..65912cef671 100644 --- a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java +++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java @@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -53,10 +55,10 @@ import java.util.stream.IntStream; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector; +import static org.apache.parquet.filter2.compat.FilterCompat.get; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; -import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; /** * This reader is used to read a {@link VectorizedColumnBatch} from input split. @@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements Closeable { int batchSize, Path path, long splitStart, - long splitLength) throws IOException { + long splitLength, + FilterPredicate filterPredicate, + UnboundRecordFilter recordFilter) throws IOException { this.utcTimestamp = utcTimestamp; this.batchSize = batchSize; // then we need to apply the predicate push down filter ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength)); MessageType fileSchema = footer.getFileMetaData().getSchema(); - FilterCompat.Filter filter = getFilter(conf); + FilterCompat.Filter filter = get(filterPredicate, recordFilter); List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); this.fileSchema = footer.getFileMetaData().getSchema(); diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index e10f975bc29..3071ecc122d 100644 --- a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.filter.UnboundRecordFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.OriginalType; @@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil { int batchSize, Path path, long splitStart, - long splitLength) throws IOException { + long splitLength, + FilterPredicate filterPredicate, + UnboundRecordFilter recordFilter) throws IOException { List<String> selNonPartNames = Arrays.stream(selectedFields) .mapToObj(i -> fullFieldNames[i]) .filter(n -> !partitionSpec.containsKey(n)) @@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil { batchSize, new org.apache.hadoop.fs.Path(path.toUri()), splitStart, - splitLength); + splitLength, + filterPredicate, + recordFilter); } private static ColumnVector createVector( diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java index 1872ec385b4..65912cef671 100644 --- a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java +++ b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java @@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -53,10 +55,10 @@ import java.util.stream.IntStream; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector; +import static org.apache.parquet.filter2.compat.FilterCompat.get; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; -import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; /** * This reader is used to read a {@link VectorizedColumnBatch} from input split. @@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements Closeable { int batchSize, Path path, long splitStart, - long splitLength) throws IOException { + long splitLength, + FilterPredicate filterPredicate, + UnboundRecordFilter recordFilter) throws IOException { this.utcTimestamp = utcTimestamp; this.batchSize = batchSize; // then we need to apply the predicate push down filter ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength)); MessageType fileSchema = footer.getFileMetaData().getSchema(); - FilterCompat.Filter filter = getFilter(conf); + FilterCompat.Filter filter = get(filterPredicate, recordFilter); List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); this.fileSchema = footer.getFileMetaData().getSchema(); diff --git a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index e10f975bc29..3071ecc122d 100644 --- a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.filter.UnboundRecordFilter; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.OriginalType; @@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil { int batchSize, Path path, long splitStart, - long splitLength) throws IOException { + long splitLength, + FilterPredicate filterPredicate, + UnboundRecordFilter recordFilter) throws IOException { List<String> selNonPartNames = Arrays.stream(selectedFields) .mapToObj(i -> fullFieldNames[i]) .filter(n -> !partitionSpec.containsKey(n)) @@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil { batchSize, new org.apache.hadoop.fs.Path(path.toUri()), splitStart, - splitLength); + splitLength, + filterPredicate, + recordFilter); } private static ColumnVector createVector( diff --git a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java index 1872ec385b4..65912cef671 100644 --- a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java +++ b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java @@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -53,10 +55,10 @@ import java.util.stream.IntStream; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector; +import static org.apache.parquet.filter2.compat.FilterCompat.get; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; -import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; /** * This reader is used to read a {@link VectorizedColumnBatch} from input split. @@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements Closeable { int batchSize, Path path, long splitStart, - long splitLength) throws IOException { + long splitLength, + FilterPredicate filterPredicate, + UnboundRecordFilter recordFilter) throws IOException { this.utcTimestamp = utcTimestamp; this.batchSize = batchSize; // then we need to apply the predicate push down filter ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength)); MessageType fileSchema = footer.getFileMetaData().getSchema(); - FilterCompat.Filter filter = getFilter(conf); + FilterCompat.Filter filter = get(filterPredicate, recordFilter); List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); this.fileSchema = footer.getFileMetaData().getSchema();
