This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9fa00b7b154 [HUDI-6066] HoodieTableSource supports parquet predicate
push down (#8437)
9fa00b7b154 is described below
commit 9fa00b7b1547ff46a1bea6d329e20dd702ff90b5
Author: Nicholas Jiang <[email protected]>
AuthorDate: Fri Sep 1 09:36:45 2023 +0800
[HUDI-6066] HoodieTableSource supports parquet predicate push down (#8437)
---
.../apache/hudi/source/ExpressionPredicates.java | 654 +++++++++++++++++++++
.../org/apache/hudi/table/HoodieTableSource.java | 18 +-
.../apache/hudi/table/format/RecordIterators.java | 60 +-
.../hudi/table/format/cdc/CdcInputFormat.java | 11 +-
.../table/format/cow/CopyOnWriteInputFormat.java | 9 +-
.../table/format/mor/MergeOnReadInputFormat.java | 17 +-
.../hudi/source/TestExpressionPredicates.java | 167 ++++++
.../apache/hudi/table/ITTestHoodieDataSource.java | 14 +
.../apache/hudi/table/TestHoodieTableSource.java | 23 +
.../table/format/cow/ParquetSplitReaderUtil.java | 10 +-
.../reader/ParquetColumnarRowSplitReader.java | 10 +-
.../table/format/cow/ParquetSplitReaderUtil.java | 10 +-
.../reader/ParquetColumnarRowSplitReader.java | 10 +-
.../table/format/cow/ParquetSplitReaderUtil.java | 10 +-
.../reader/ParquetColumnarRowSplitReader.java | 10 +-
.../table/format/cow/ParquetSplitReaderUtil.java | 10 +-
.../reader/ParquetColumnarRowSplitReader.java | 10 +-
.../table/format/cow/ParquetSplitReaderUtil.java | 10 +-
.../reader/ParquetColumnarRowSplitReader.java | 10 +-
19 files changed, 1037 insertions(+), 36 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
new file mode 100644
index 00000000000..046e4b739ad
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.util.ExpressionUtils.getValueFromLiteral;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.io.api.Binary.fromConstantByteArray;
+import static org.apache.parquet.io.api.Binary.fromString;
+
+/**
+ * Tool to predicate the {@link
org.apache.flink.table.expressions.ResolvedExpression}s.
+ */
+public class ExpressionPredicates {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ExpressionPredicates.class);
+
+ /**
+ * Converts specific call expression list to the predicate list.
+ *
+ * @param resolvedExpressions The resolved expressions to convert.
+ * @return The converted predicates.
+ */
+ public static List<Predicate> fromExpression(List<ResolvedExpression>
resolvedExpressions) {
+ return resolvedExpressions.stream()
+ .map(e -> fromExpression((CallExpression) e))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Converts specific call expression to the predicate.
+ *
+ * <p>Two steps to bind the call:
+ * 1. map the predicate instance;
+ * 2. bind the field reference;
+ *
+ * <p>Normalize the expression to simplify the subsequent decision logic:
+ * always put the literal expression in the RHS.
+ *
+ * @param callExpression The call expression to convert.
+ * @return The converted predicate.
+ */
+ public static Predicate fromExpression(CallExpression callExpression) {
+ FunctionDefinition functionDefinition =
callExpression.getFunctionDefinition();
+ List<Expression> childExpressions = callExpression.getChildren();
+
+ boolean normalized = childExpressions.get(0) instanceof
FieldReferenceExpression;
+
+ if (BuiltInFunctionDefinitions.NOT.equals(functionDefinition)) {
+ Not predicate = Not.getInstance();
+ Predicate childPredicate = fromExpression((CallExpression)
childExpressions.get(0));
+ return predicate.bindPredicate(childPredicate);
+ }
+
+ if (BuiltInFunctionDefinitions.AND.equals(functionDefinition)) {
+ And predicate = And.getInstance();
+ Predicate predicate1 = fromExpression((CallExpression)
childExpressions.get(0));
+ Predicate predicate2 = fromExpression((CallExpression)
childExpressions.get(1));
+ return predicate.bindPredicates(predicate1, predicate2);
+ }
+
+ if (BuiltInFunctionDefinitions.OR.equals(functionDefinition)) {
+ Or predicate = Or.getInstance();
+ Predicate predicate1 = fromExpression((CallExpression)
childExpressions.get(0));
+ Predicate predicate2 = fromExpression((CallExpression)
childExpressions.get(1));
+ return predicate.bindPredicates(predicate1, predicate2);
+ }
+
+ if (BuiltInFunctionDefinitions.IS_NULL.equals(functionDefinition)
+ || BuiltInFunctionDefinitions.IS_NOT_NULL.equals(functionDefinition)
+ || childExpressions.stream().anyMatch(e -> e instanceof
ValueLiteralExpression
+ && getValueFromLiteral((ValueLiteralExpression) e) == null)) {
+ return AlwaysNull.getInstance();
+ }
+
+ // handle IN specifically
+ if (BuiltInFunctionDefinitions.IN.equals(functionDefinition)) {
+ checkState(normalized, "The IN expression expects to be normalized");
+ In in = In.getInstance();
+ FieldReferenceExpression fieldReference = (FieldReferenceExpression)
childExpressions.get(0);
+ List<ValueLiteralExpression> valueLiterals = IntStream.range(1,
childExpressions.size())
+ .mapToObj(index -> (ValueLiteralExpression)
childExpressions.get(index))
+ .collect(Collectors.toList());
+ return
in.bindValueLiterals(valueLiterals).bindFieldReference(fieldReference);
+ }
+
+ ColumnPredicate predicate;
+ // handle binary operators
+ if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+ predicate = Equals.getInstance();
+ } else if
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+ predicate = NotEquals.getInstance();
+ } else if
(BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+ predicate = normalized ? LessThan.getInstance() :
GreaterThan.getInstance();
+ } else if
(BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+ predicate = normalized ? GreaterThan.getInstance() :
LessThan.getInstance();
+ } else if
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+ predicate = normalized ? LessThanOrEqual.getInstance() :
GreaterThanOrEqual.getInstance();
+ } else if
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+ predicate = normalized ? GreaterThanOrEqual.getInstance() :
LessThanOrEqual.getInstance();
+ } else {
+ throw new AssertionError("Unexpected function definition " +
functionDefinition);
+ }
+ FieldReferenceExpression fieldReference = normalized
+ ? (FieldReferenceExpression) childExpressions.get(0)
+ : (FieldReferenceExpression) childExpressions.get(1);
+ ValueLiteralExpression valueLiteral = normalized
+ ? (ValueLiteralExpression) childExpressions.get(1)
+ : (ValueLiteralExpression) childExpressions.get(0);
+ return
predicate.bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Classes to define predicates
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * A filter predicate that can be evaluated by the FileInputFormat.
+ */
+ public interface Predicate extends Serializable {
+
+ /**
+ * Predicates the criteria for which records to keep when loading data
from a parquet file.
+ *
+ * @return A filter predicate of parquet file.
+ */
+ FilterPredicate filter();
+ }
+
+ /**
+ * Column predicate which depends on the given field.
+ */
+ public abstract static class ColumnPredicate implements Predicate {
+
+ // referenced field type
+ protected LogicalType literalType;
+
+ // referenced field name
+ protected String columnName;
+
+ // the constant literal value
+ protected Serializable literal;
+
+ /**
+ * Binds field reference to create a column predicate.
+ *
+ * @param fieldReference The field reference to negate.
+ * @return A column predicate.
+ */
+ public ColumnPredicate bindFieldReference(FieldReferenceExpression
fieldReference) {
+ this.literalType = fieldReference.getOutputDataType().getLogicalType();
+ this.columnName = fieldReference.getName();
+ return this;
+ }
+
+ /**
+ * Binds value literal to create a column predicate.
+ *
+ * @param valueLiteral The value literal to negate.
+ * @return A column predicate.
+ */
+ public ColumnPredicate bindValueLiteral(ValueLiteralExpression
valueLiteral) {
+ Object literalObject = getValueFromLiteral(valueLiteral);
+ // validate that literal is serializable
+ if (literalObject instanceof Serializable) {
+ this.literal = (Serializable) literalObject;
+ } else {
+ LOG.warn("Encountered a non-serializable literal. " + "Cannot push
predicate with value literal [{}] into FileInputFormat. " + "This is a bug and
should be reported.", valueLiteral);
+ this.literal = null;
+ }
+ return this;
+ }
+
+ @Override
+ public FilterPredicate filter() {
+ return toParquetPredicate(getFunctionDefinition(), literalType,
columnName, literal);
+ }
+
+ /**
+ * Returns function definition of predicate.
+ *
+ * @return A function definition of predicate.
+ */
+ public FunctionDefinition getFunctionDefinition() {
+ return null;
+ }
+ }
+
+ /**
+ * An EQUALS predicate that can be evaluated by the FileInputFormat.
+ */
+ public static class Equals extends ColumnPredicate {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Returns a EQUALS predicate.
+ *
+ * @return A EQUALS predicate instance.
+ */
+ public static Equals getInstance() {
+ return new Equals();
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition() {
+ return BuiltInFunctionDefinitions.EQUALS;
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " = " + literal;
+ }
+ }
+
+ /**
+ * A NOT_EQUALS predicate that can be evaluated by the FileInputFormat.
+ */
+ public static class NotEquals extends ColumnPredicate {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Returns a NOT_EQUALS predicate.
+ *
+ * @return A NOT_EQUALS predicate instance.
+ */
+ public static NotEquals getInstance() {
+ return new NotEquals();
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition() {
+ return BuiltInFunctionDefinitions.NOT_EQUALS;
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " != " + literal;
+ }
+ }
+
+ /**
+ * A LESS_THAN predicate that can be evaluated by the FileInputFormat.
+ */
+ public static class LessThan extends ColumnPredicate {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Returns a LESS_THAN predicate.
+ *
+ * @return A LESS_THAN predicate instance.
+ */
+ public static LessThan getInstance() {
+ return new LessThan();
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition() {
+ return BuiltInFunctionDefinitions.LESS_THAN;
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " < " + literal;
+ }
+ }
+
+ /**
+ * A GREATER_THAN predicate that can be evaluated by the FileInputFormat.
+ */
+ public static class GreaterThan extends ColumnPredicate {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Returns a GREATER_THAN predicate.
+ *
+ * @return A GREATER_THAN predicate instance.
+ */
+ public static GreaterThan getInstance() {
+ return new GreaterThan();
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition() {
+ return BuiltInFunctionDefinitions.GREATER_THAN;
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " > " + literal;
+ }
+ }
+
+ /**
+ * A LESS_THAN_OR_EQUAL predicate that can be evaluated by the
FileInputFormat.
+ */
+ public static class LessThanOrEqual extends ColumnPredicate {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Returns a LESS_THAN_OR_EQUAL predicate.
+ *
+ * @return A LESS_THAN_OR_EQUAL predicate instance.
+ */
+ public static LessThanOrEqual getInstance() {
+ return new LessThanOrEqual();
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition() {
+ return BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL;
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " <= " + literal;
+ }
+ }
+
+ /**
+ * A GREATER_THAN_OR_EQUAL predicate that can be evaluated by the
FileInputFormat.
+ */
+ public static class GreaterThanOrEqual extends ColumnPredicate {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Returns a GREATER_THAN_OR_EQUAL predicate.
+ *
+ * @return A GREATER_THAN_OR_EQUAL predicate instance.
+ */
+ public static GreaterThanOrEqual getInstance() {
+ return new GreaterThanOrEqual();
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition() {
+ return BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL;
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " >= " + literal;
+ }
+ }
+
+ /**
+ * An IN predicate that can be evaluated by the FileInputFormat.
+ */
+ public static class In extends ColumnPredicate {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ExpressionEvaluators.In.class);
+
+
+ private static final int IN_PREDICATE_LIMIT = 200;
+
+ // the constant literal values
+ protected List<Serializable> literals;
+
+ /**
+ * Returns an IN predicate.
+ *
+ * @return An IN predicate instance.
+ */
+ public static In getInstance() {
+ return new In();
+ }
+
+ /**
+ * Binds value literals to create an IN predicate.
+ *
+ * @param valueLiterals The value literals to negate.
+ * @return An IN predicate.
+ */
+ public ColumnPredicate bindValueLiterals(List<ValueLiteralExpression>
valueLiterals) {
+ this.literals = valueLiterals.stream().map(valueLiteral -> {
+ Object literalObject = getValueFromLiteral(valueLiteral);
+ // validate that literal is serializable
+ if (literalObject instanceof Serializable) {
+ return (Serializable) literalObject;
+ } else {
+ LOG.warn("Encountered a non-serializable literal. " + "Cannot push
predicate with value literal [{}] into FileInputFormat. " + "This is a bug and
should be reported.", valueLiteral);
+ return null;
+ }
+ }).collect(Collectors.toList());
+ return this;
+ }
+
+ @Override
+ public FilterPredicate filter() {
+ if (literals.stream().anyMatch(Objects::isNull) || literals.size() >
IN_PREDICATE_LIMIT) {
+ return null;
+ }
+
+ FilterPredicate filterPredicate = null;
+ for (Serializable literal : literals) {
+ FilterPredicate predicate =
toParquetPredicate(BuiltInFunctionDefinitions.EQUALS, literalType, columnName,
literal);
+ if (predicate != null) {
+ filterPredicate = filterPredicate == null ? predicate :
or(filterPredicate, predicate);
+ }
+ }
+ return filterPredicate;
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " IN(" + Arrays.toString(literals.toArray()) + ")";
+ }
+ }
+
+ /**
+ * A special predicate which is not possible to match any condition.
+ */
+ public static class AlwaysNull implements Predicate {
+
+ private static final long serialVersionUID = 1L;
+
+ public static AlwaysNull getInstance() {
+ return new AlwaysNull();
+ }
+
+ @Override
+ public FilterPredicate filter() {
+ return null;
+ }
+ }
+
+ /**
+ * A NOT predicate to negate a predicate that can be evaluated by the
FileInputFormat.
+ */
+ public static class Not implements Predicate {
+
+ private static final long serialVersionUID = 1L;
+
+ private Predicate predicate;
+
+ /**
+ * Returns a NOT predicate.
+ */
+ public static Not getInstance() {
+ return new Not();
+ }
+
+ /**
+ * Binds predicate to create a NOT predicate.
+ *
+ * @param predicate The predicate to negate.
+ * @return A NOT predicate.
+ */
+ public Predicate bindPredicate(Predicate predicate) {
+ this.predicate = predicate;
+ return this;
+ }
+
+ @Override
+ public FilterPredicate filter() {
+ return not(predicate.filter());
+ }
+
+ @Override
+ public String toString() {
+ return "NOT(" + predicate.toString() + ")";
+ }
+ }
+
+ /**
+ * An AND predicate that can be evaluated by the FileInputFormat.
+ */
+ public static class And implements Predicate {
+
+ private static final long serialVersionUID = 1L;
+
+ private Predicate[] predicates;
+
+ /**
+ * Returns an AND predicate.
+ */
+ public static And getInstance() {
+ return new And();
+ }
+
+ /**
+ * Binds predicates to create an AND predicate.
+ *
+ * @param predicates The disjunctive predicates.
+ * @return An AND predicate.
+ */
+ public Predicate bindPredicates(Predicate... predicates) {
+ this.predicates = predicates;
+ return this;
+ }
+
+ @Override
+ public FilterPredicate filter() {
+ return and(predicates[0].filter(), predicates[1].filter());
+ }
+
+ @Override
+ public String toString() {
+ return "AND(" + Arrays.toString(predicates) + ")";
+ }
+ }
+
+ /**
+ * An OR predicate that can be evaluated by the FileInputFormat.
+ */
+ public static class Or implements Predicate {
+
+ private static final long serialVersionUID = 1L;
+
+ private Predicate[] predicates;
+
+ /**
+ * Returns an OR predicate.
+ */
+ public static Or getInstance() {
+ return new Or();
+ }
+
+ /**
+ * Binds predicates to create an OR predicate.
+ *
+ * @param predicates The disjunctive predicates.
+ * @return An OR predicate.
+ */
+ public Predicate bindPredicates(Predicate... predicates) {
+ this.predicates = predicates;
+ return this;
+ }
+
+ @Override
+ public FilterPredicate filter() {
+ return or(predicates[0].filter(), predicates[1].filter());
+ }
+
+ @Override
+ public String toString() {
+ return "OR(" + Arrays.toString(predicates) + ")";
+ }
+ }
+
+ private static FilterPredicate toParquetPredicate(FunctionDefinition
functionDefinition, LogicalType literalType, String columnName, Serializable
literal) {
+ switch (literalType.getTypeRoot()) {
+ case BOOLEAN:
+ return predicateSupportsEqNotEq(functionDefinition,
booleanColumn(columnName), (Boolean) literal);
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case TIME_WITHOUT_TIME_ZONE:
+ return predicateSupportsLtGt(functionDefinition,
intColumn(columnName), (Integer) literal);
+ case BIGINT:
+ case DATE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return predicateSupportsLtGt(functionDefinition,
longColumn(columnName), (Long) literal);
+ case FLOAT:
+ return predicateSupportsLtGt(functionDefinition,
floatColumn(columnName), (Float) literal);
+ case DOUBLE:
+ return predicateSupportsLtGt(functionDefinition,
doubleColumn(columnName), (Double) literal);
+ case BINARY:
+ case VARBINARY:
+ return predicateSupportsLtGt(functionDefinition,
binaryColumn(columnName), fromConstantByteArray((byte[]) literal));
+ case CHAR:
+ case VARCHAR:
+ return predicateSupportsLtGt(functionDefinition,
binaryColumn(columnName), fromString((String) literal));
+ default:
+ return null;
+ }
+ }
+
+ private static <T extends Comparable<T>, C extends Operators.Column<T> &
Operators.SupportsEqNotEq> FilterPredicate predicateSupportsEqNotEq(
+ FunctionDefinition functionDefinition, C column, T value) {
+ if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+ return eq(column, value);
+ } else if
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+ return notEq(column, value);
+ } else {
+ throw new AssertionError("Unexpected function definition " +
functionDefinition);
+ }
+ }
+
+ private static <T extends Comparable<T>, C extends Operators.Column<T> &
Operators.SupportsLtGt> FilterPredicate
predicateSupportsLtGt(FunctionDefinition functionDefinition, C column, T value)
{
+ if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+ return eq(column, value);
+ } else if
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+ return notEq(column, value);
+ } else if
(BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+ return lt(column, value);
+ } else if
(BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+ return gt(column, value);
+ } else if
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+ return ltEq(column, value);
+ } else if
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+ return gtEq(column, value);
+ } else {
+ throw new AssertionError("Unexpected function definition " +
functionDefinition);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 540f1a8c79d..03eb3205e8c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -40,6 +40,8 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.ExpressionPredicates;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
@@ -134,6 +136,7 @@ public class HoodieTableSource implements
private int[] requiredPos;
private long limit;
+ private List<Predicate> predicates;
private DataPruner dataPruner;
private PartitionPruners.PartitionPruner partitionPruner;
private int dataBucket;
@@ -145,7 +148,7 @@ public class HoodieTableSource implements
List<String> partitionKeys,
String defaultPartName,
Configuration conf) {
- this(schema, path, partitionKeys, defaultPartName, conf, null, null,
PrimaryKeyPruners.BUCKET_ID_NO_PRUNING, null, null, null, null);
+ this(schema, path, partitionKeys, defaultPartName, conf, null, null, null,
PrimaryKeyPruners.BUCKET_ID_NO_PRUNING, null, null, null, null);
}
public HoodieTableSource(
@@ -154,6 +157,7 @@ public class HoodieTableSource implements
List<String> partitionKeys,
String defaultPartName,
Configuration conf,
+ @Nullable List<Predicate> predicates,
@Nullable DataPruner dataPruner,
@Nullable PartitionPruners.PartitionPruner partitionPruner,
int dataBucket,
@@ -167,6 +171,7 @@ public class HoodieTableSource implements
this.partitionKeys = partitionKeys;
this.defaultPartName = defaultPartName;
this.conf = conf;
+ this.predicates = predicates == null ? Collections.emptyList() :
predicates;
this.dataPruner = dataPruner;
this.partitionPruner = partitionPruner;
this.dataBucket = dataBucket;
@@ -230,7 +235,7 @@ public class HoodieTableSource implements
@Override
public DynamicTableSource copy() {
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
- conf, dataPruner, partitionPruner, dataBucket, requiredPos, limit,
metaClient, internalSchemaManager);
+ conf, predicates, dataPruner, partitionPruner, dataBucket,
requiredPos, limit, metaClient, internalSchemaManager);
}
@Override
@@ -242,6 +247,7 @@ public class HoodieTableSource implements
public Result applyFilters(List<ResolvedExpression> filters) {
List<ResolvedExpression> simpleFilters =
filterSimpleCallExpression(filters);
Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitFilters =
splitExprByPartitionCall(simpleFilters, this.partitionKeys, this.tableRowType);
+ this.predicates = ExpressionPredicates.fromExpression(splitFilters.f0);
this.dataPruner = DataPruner.newInstance(splitFilters.f0);
this.partitionPruner = cratePartitionPruner(splitFilters.f1);
this.dataBucket = getDataBucket(splitFilters.f0);
@@ -474,6 +480,7 @@ public class HoodieTableSource implements
// is not very stable.
.fieldTypes(rowDataType.getChildren())
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+ .predicates(this.predicates)
.limit(this.limit)
.emitDelete(false) // the change logs iterator can handle the DELETE
records
.build();
@@ -500,6 +507,7 @@ public class HoodieTableSource implements
// is not very stable.
.fieldTypes(rowDataType.getChildren())
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+ .predicates(this.predicates)
.limit(this.limit)
.emitDelete(emitDelete)
.internalSchemaManager(internalSchemaManager)
@@ -530,6 +538,7 @@ public class HoodieTableSource implements
this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD),
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
+ this.predicates,
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, //
ParquetInputFormat always uses the limit value
getParquetConf(this.conf, this.hadoopConf),
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
@@ -600,6 +609,11 @@ public class HoodieTableSource implements
return fileIndex.getFilesInPartitions();
}
+ @VisibleForTesting
+ public List<Predicate> getPredicates() {
+ return predicates;
+ }
+
@VisibleForTesting
public DataPruner getDataPruner() {
return dataPruner;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index b6be67df55a..711ed446713 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -21,17 +21,29 @@ package org.apache.hudi.table.format;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.util.RowDataProjection;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.SerializationUtil;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.hadoop.ParquetInputFormat.FILTER_PREDICATE;
+import static
org.apache.parquet.hadoop.ParquetInputFormat.UNBOUND_RECORD_FILTER;
+
/**
* Factory clazz for record iterators.
*/
@@ -49,7 +61,17 @@ public abstract class RecordIterators {
int batchSize,
Path path,
long splitStart,
- long splitLength) throws IOException {
+ long splitLength,
+ List<Predicate> predicates) throws IOException {
+ FilterPredicate filterPredicate = getFilterPredicate(conf);
+ for (Predicate predicate : predicates) {
+ FilterPredicate filter = predicate.filter();
+ if (filter != null) {
+ filterPredicate = filterPredicate == null ? filter :
and(filterPredicate, filter);
+ }
+ }
+ UnboundRecordFilter recordFilter = getUnboundRecordFilterInstance(conf);
+
InternalSchema mergeSchema =
internalSchemaManager.getMergeSchema(path.getName());
if (mergeSchema.isEmptySchema()) {
return new ParquetSplitRecordIterator(
@@ -64,7 +86,9 @@ public abstract class RecordIterators {
batchSize,
path,
splitStart,
- splitLength));
+ splitLength,
+ filterPredicate,
+ recordFilter));
} else {
CastMap castMap = internalSchemaManager.getCastMap(mergeSchema,
fieldNames, fieldTypes, selectedFields);
Option<RowDataProjection> castProjection =
castMap.toRowDataProjection(selectedFields);
@@ -80,7 +104,9 @@ public abstract class RecordIterators {
batchSize,
path,
splitStart,
- splitLength));
+ splitLength,
+ filterPredicate,
+ recordFilter));
if (castProjection.isPresent()) {
return new SchemaEvolvedRecordIterator(itr, castProjection.get());
} else {
@@ -88,4 +114,32 @@ public abstract class RecordIterators {
}
}
}
+
+ private static FilterPredicate getFilterPredicate(Configuration
configuration) {
+ try {
+ return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE,
configuration);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static UnboundRecordFilter
getUnboundRecordFilterInstance(Configuration configuration) {
+ Class<?> clazz = ConfigurationUtil.getClassFromConfig(configuration,
UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
+ if (clazz == null) {
+ return null;
+ }
+
+ try {
+ UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter)
clazz.newInstance();
+
+ if (unboundRecordFilter instanceof Configurable) {
+ ((Configurable) unboundRecordFilter).setConf(configuration);
+ }
+
+ return unboundRecordFilter;
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new BadConfigurationException(
+ "could not instantiate unbound record filter class", e);
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 124f8482b6f..154df81a0d4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
@@ -88,9 +89,10 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
MergeOnReadTableState tableState,
List<DataType> fieldTypes,
String defaultPartName,
+ List<Predicate> predicates,
long limit,
boolean emitDelete) {
- super(conf, tableState, fieldTypes, defaultPartName, limit, emitDelete,
InternalSchemaManager.DISABLED);
+ super(conf, tableState, fieldTypes, defaultPartName, predicates, limit,
emitDelete, InternalSchemaManager.DISABLED);
}
@Override
@@ -701,6 +703,11 @@ public class CdcInputFormat extends MergeOnReadInputFormat
{
return this;
}
+ public Builder predicates(List<Predicate> predicates) {
+ this.predicates = predicates;
+ return this;
+ }
+
public Builder limit(long limit) {
this.limit = limit;
return this;
@@ -713,7 +720,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
public CdcInputFormat build() {
return new CdcInputFormat(conf, tableState, fieldTypes,
- defaultPartName, limit, emitDelete);
+ defaultPartName, predicates, limit, emitDelete);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index ec9b0b02a7b..5b365a58990 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -18,9 +18,9 @@
package org.apache.hudi.table.format.cow;
-import java.util.Comparator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.RecordIterators;
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -75,6 +76,7 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
private final boolean hiveStylePartitioning;
private final boolean utcTimestamp;
private final SerializableConfiguration conf;
+ private final List<Predicate> predicates;
private final long limit;
private transient ClosableIterator<RowData> itr;
@@ -95,11 +97,13 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
String partDefaultName,
String partPathField,
boolean hiveStylePartitioning,
+ List<Predicate> predicates,
long limit,
Configuration conf,
boolean utcTimestamp,
InternalSchemaManager internalSchemaManager) {
super.setFilePaths(paths);
+ this.predicates = predicates;
this.limit = limit;
this.partDefaultName = partDefaultName;
this.partPathField = partPathField;
@@ -135,7 +139,8 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
2048,
fileSplit.getPath(),
fileSplit.getStart(),
- fileSplit.getLength());
+ fileSplit.getLength(),
+ predicates);
this.currentReadCount = 0L;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 23a3934aeb9..f13098fc7c7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
@@ -123,6 +124,9 @@ public class MergeOnReadInputFormat
*/
private final int[] requiredPos;
+ // for predicate push down
+ private final List<Predicate> predicates;
+
// for limit push down
/**
* Limit for the reader, -1 when the reading is not limited.
@@ -152,6 +156,7 @@ public class MergeOnReadInputFormat
MergeOnReadTableState tableState,
List<DataType> fieldTypes,
String defaultPartName,
+ List<Predicate> predicates,
long limit,
boolean emitDelete,
InternalSchemaManager internalSchemaManager) {
@@ -163,6 +168,7 @@ public class MergeOnReadInputFormat
// Needs improvement: this requiredPos is only suitable for parquet reader,
// because we need to
this.requiredPos = tableState.getRequiredPositions();
+ this.predicates = predicates;
this.limit = limit;
this.emitDelete = emitDelete;
this.internalSchemaManager = internalSchemaManager;
@@ -336,7 +342,8 @@ public class MergeOnReadInputFormat
2048,
new org.apache.flink.core.fs.Path(path),
0,
- Long.MAX_VALUE); // read the whole file
+ Long.MAX_VALUE, // read the whole file
+ predicates);
}
private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit
split) {
@@ -845,6 +852,7 @@ public class MergeOnReadInputFormat
protected MergeOnReadTableState tableState;
protected List<DataType> fieldTypes;
protected String defaultPartName;
+ protected List<Predicate> predicates;
protected long limit = -1;
protected boolean emitDelete = false;
protected InternalSchemaManager internalSchemaManager =
InternalSchemaManager.DISABLED;
@@ -869,6 +877,11 @@ public class MergeOnReadInputFormat
return this;
}
+ public Builder predicates(List<Predicate> predicates) {
+ this.predicates = predicates;
+ return this;
+ }
+
public Builder limit(long limit) {
this.limit = limit;
return this;
@@ -886,7 +899,7 @@ public class MergeOnReadInputFormat
public MergeOnReadInputFormat build() {
return new MergeOnReadInputFormat(conf, tableState, fieldTypes,
- defaultPartName, limit, emitDelete, internalSchemaManager);
+ defaultPartName, predicates, limit, emitDelete,
internalSchemaManager);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
new file mode 100644
index 00000000000..97b06644266
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.source.ExpressionPredicates.And;
+import org.apache.hudi.source.ExpressionPredicates.Equals;
+import org.apache.hudi.source.ExpressionPredicates.GreaterThan;
+import org.apache.hudi.source.ExpressionPredicates.GreaterThanOrEqual;
+import org.apache.hudi.source.ExpressionPredicates.In;
+import org.apache.hudi.source.ExpressionPredicates.LessThan;
+import org.apache.hudi.source.ExpressionPredicates.LessThanOrEqual;
+import org.apache.hudi.source.ExpressionPredicates.Not;
+import org.apache.hudi.source.ExpressionPredicates.NotEquals;
+import org.apache.hudi.source.ExpressionPredicates.Or;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hudi.source.ExpressionPredicates.fromExpression;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test cases for {@link ExpressionPredicates}.
+ */
+public class TestExpressionPredicates {
+
+ @Test
+ public void testFilterPredicateFromExpression() {
+ FieldReferenceExpression fieldReference = new
FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0);
+ ValueLiteralExpression valueLiteral = new ValueLiteralExpression(10);
+ List<ResolvedExpression> expressions = Arrays.asList(fieldReference,
valueLiteral);
+ IntColumn intColumn = intColumn("f_int");
+
+ // equals
+ CallExpression equalsExpression = new CallExpression(
+ BuiltInFunctionDefinitions.EQUALS, expressions, DataTypes.BOOLEAN());
+ Predicate predicate1 =
Equals.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+ Eq<Integer> eq = eq(intColumn, 10);
+ Predicate predicate2 = fromExpression(equalsExpression);
+ assertEquals(predicate1.toString(), predicate2.toString());
+ assertEquals(eq, predicate2.filter());
+
+ // not equals
+ CallExpression notEqualsExpression = new CallExpression(
+ BuiltInFunctionDefinitions.NOT_EQUALS, expressions,
DataTypes.BOOLEAN());
+ Predicate predicate3 =
NotEquals.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+ Predicate predicate4 = fromExpression(notEqualsExpression);
+ assertEquals(predicate3.toString(), predicate4.toString());
+ assertEquals(notEq(intColumn, 10), predicate4.filter());
+
+ // less than
+ CallExpression lessThanExpression = new CallExpression(
+ BuiltInFunctionDefinitions.LESS_THAN, expressions,
DataTypes.BOOLEAN());
+ Predicate predicate5 =
LessThan.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+ Lt<Integer> lt = lt(intColumn, 10);
+ Predicate predicate6 = fromExpression(lessThanExpression);
+ assertEquals(predicate5.toString(), predicate6.toString());
+ assertEquals(lt, predicate6.filter());
+
+ // greater than
+ CallExpression greaterThanExpression = new CallExpression(
+ BuiltInFunctionDefinitions.GREATER_THAN, expressions,
DataTypes.BOOLEAN());
+ Predicate predicate7 =
GreaterThan.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+ Gt<Integer> gt = gt(intColumn, 10);
+ Predicate predicate8 = fromExpression(greaterThanExpression);
+ assertEquals(predicate7.toString(), predicate8.toString());
+ assertEquals(gt, predicate8.filter());
+
+ // less than or equal
+ CallExpression lessThanOrEqualExpression = new CallExpression(
+ BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, expressions,
DataTypes.BOOLEAN());
+ Predicate predicate9 =
LessThanOrEqual.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+ Predicate predicate10 = fromExpression(lessThanOrEqualExpression);
+ assertEquals(predicate9.toString(), predicate10.toString());
+ assertEquals(ltEq(intColumn, 10), predicate10.filter());
+
+ // greater than or equal
+ CallExpression greaterThanOrEqualExpression = new CallExpression(
+ BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, expressions,
DataTypes.BOOLEAN());
+ Predicate predicate11 =
GreaterThanOrEqual.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+ Predicate predicate12 = fromExpression(greaterThanOrEqualExpression);
+ assertEquals(predicate11.toString(), predicate12.toString());
+ assertEquals(gtEq(intColumn, 10), predicate12.filter());
+
+ // in
+ ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(11);
+ ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression(12);
+ CallExpression inExpression = new CallExpression(
+ BuiltInFunctionDefinitions.IN,
+ Arrays.asList(fieldReference, valueLiteral1, valueLiteral2),
+ DataTypes.BOOLEAN());
+ Predicate predicate13 =
In.getInstance().bindValueLiterals(Arrays.asList(valueLiteral1,
valueLiteral2)).bindFieldReference(fieldReference);
+ Predicate predicate14 = fromExpression(inExpression);
+ assertEquals(predicate13.toString(), predicate14.toString());
+ assertEquals(or(eq(intColumn, 11), eq(intColumn, 12)),
predicate14.filter());
+
+ // not
+ CallExpression notExpression = new CallExpression(
+ BuiltInFunctionDefinitions.NOT,
+ Collections.singletonList(equalsExpression),
+ DataTypes.BOOLEAN());
+ Predicate predicate15 = Not.getInstance().bindPredicate(predicate2);
+ Predicate predicate16 = fromExpression(notExpression);
+ assertEquals(predicate15.toString(), predicate16.toString());
+ assertEquals(not(eq), predicate16.filter());
+
+ // and
+ CallExpression andExpression = new CallExpression(
+ BuiltInFunctionDefinitions.AND,
+ Arrays.asList(lessThanExpression, greaterThanExpression),
+ DataTypes.BOOLEAN());
+ Predicate predicate17 = And.getInstance().bindPredicates(predicate6,
predicate8);
+ Predicate predicate18 = fromExpression(andExpression);
+ assertEquals(predicate17.toString(), predicate18.toString());
+ assertEquals(and(lt, gt), predicate18.filter());
+
+ // or
+ CallExpression orExpression = new CallExpression(
+ BuiltInFunctionDefinitions.OR,
+ Arrays.asList(lessThanExpression, greaterThanExpression),
+ DataTypes.BOOLEAN());
+ Predicate predicate19 = Or.getInstance().bindPredicates(predicate6,
predicate8);
+ Predicate predicate20 = fromExpression(orExpression);
+ assertEquals(predicate19.toString(), predicate20.toString());
+ assertEquals(or(lt, gt), predicate20.filter());
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 4ea92fbb845..40fb28619de 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -2036,6 +2036,20 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result4, expected4);
}
+ @Test
+ void testReadWithParquetPredicatePushDown() {
+ TableEnvironment tableEnv = batchTableEnv;
+ String hoodieTableDDL = sql("t1").option(FlinkOptions.PATH,
tempFile.getAbsolutePath()).end();
+ tableEnv.executeSql(hoodieTableDDL);
+ execInsertSql(tableEnv, TestSQL.INSERT_T1);
+ // apply filters to push down predicates
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1 where uuid > 'id2' and age >
30 and ts > '1970-01-01 00:00:04'").execute().collect());
+ assertRowsEquals(result, "["
+ + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+ + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 2716dee2b1b..d0201620219 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.prune.DataPruner;
import org.apache.hudi.source.prune.PrimaryKeyPruners;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
@@ -55,6 +56,7 @@ import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -65,6 +67,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -291,6 +294,26 @@ public class TestHoodieTableSource {
assertThat(metaClient, is(tableSourceCopy.getMetaClient()));
}
+ @Test
+ void testFilterPushDownWithParquetPredicates() {
+ HoodieTableSource tableSource = getEmptyStreamingSource();
+ List<ResolvedExpression> expressions = new ArrayList<>();
+ expressions.add(new FieldReferenceExpression("f_int", DataTypes.INT(), 0,
0));
+ expressions.add(new ValueLiteralExpression(10));
+ ResolvedExpression equalsExpression = new CallExpression(
+ BuiltInFunctionDefinitions.EQUALS, expressions, DataTypes.BOOLEAN());
+ CallExpression greaterThanExpression = new CallExpression(
+ BuiltInFunctionDefinitions.GREATER_THAN, expressions,
DataTypes.BOOLEAN());
+ CallExpression orExpression = new CallExpression(
+ BuiltInFunctionDefinitions.OR,
+ Arrays.asList(equalsExpression, greaterThanExpression),
+ DataTypes.BOOLEAN());
+ List<ResolvedExpression> expectedFilters = Arrays.asList(equalsExpression,
greaterThanExpression, orExpression);
+ tableSource.applyFilters(expectedFilters);
+ String actualPredicates = tableSource.getPredicates().toString();
+
assertEquals(ExpressionPredicates.fromExpression(expectedFilters).toString(),
actualPredicates);
+ }
+
private HoodieTableSource getEmptyStreamingSource() {
final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path);
diff --git
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 4a9675d746a..622f499b64b 100644
---
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.OriginalType;
@@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil {
int batchSize,
Path path,
long splitStart,
- long splitLength) throws IOException {
+ long splitLength,
+ FilterPredicate filterPredicate,
+ UnboundRecordFilter recordFilter) throws IOException {
List<String> selNonPartNames = Arrays.stream(selectedFields)
.mapToObj(i -> fullFieldNames[i])
.filter(n -> !partitionSpec.containsKey(n))
@@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil {
batchSize,
new org.apache.hadoop.fs.Path(path.toUri()),
splitStart,
- splitLength);
+ splitLength,
+ filterPredicate,
+ recordFilter);
}
private static ColumnVector createVector(
diff --git
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 6922ada9acf..9436305d295 100644
---
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -53,10 +55,10 @@ import java.util.stream.IntStream;
import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.FilterCompat.get;
import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
/**
* This reader is used to read a {@link VectorizedColumnBatch} from input
split.
@@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements
Closeable {
int batchSize,
Path path,
long splitStart,
- long splitLength) throws IOException {
+ long splitLength,
+ FilterPredicate filterPredicate,
+ UnboundRecordFilter recordFilter) throws IOException {
this.utcTimestamp = utcTimestamp;
this.batchSize = batchSize;
// then we need to apply the predicate push down filter
ParquetMetadata footer = readFooter(conf, path, range(splitStart,
splitStart + splitLength));
MessageType fileSchema = footer.getFileMetaData().getSchema();
- FilterCompat.Filter filter = getFilter(conf);
+ FilterCompat.Filter filter = get(filterPredicate, recordFilter);
List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(),
fileSchema);
this.fileSchema = footer.getFileMetaData().getSchema();
diff --git
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index a7bd063c746..7e611a5e2cb 100644
---
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.OriginalType;
@@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil {
int batchSize,
Path path,
long splitStart,
- long splitLength) throws IOException {
+ long splitLength,
+ FilterPredicate filterPredicate,
+ UnboundRecordFilter recordFilter) throws IOException {
List<String> selNonPartNames = Arrays.stream(selectedFields)
.mapToObj(i -> fullFieldNames[i])
.filter(n -> !partitionSpec.containsKey(n))
@@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil {
batchSize,
new org.apache.hadoop.fs.Path(path.toUri()),
splitStart,
- splitLength);
+ splitLength,
+ filterPredicate,
+ recordFilter);
}
private static ColumnVector createVector(
diff --git
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 1826d5bea4c..4eb91988403 100644
---
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -53,10 +55,10 @@ import java.util.stream.IntStream;
import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.FilterCompat.get;
import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
/**
* This reader is used to read a {@link VectorizedColumnBatch} from input
split.
@@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements
Closeable {
int batchSize,
Path path,
long splitStart,
- long splitLength) throws IOException {
+ long splitLength,
+ FilterPredicate filterPredicate,
+ UnboundRecordFilter recordFilter) throws IOException {
this.utcTimestamp = utcTimestamp;
this.batchSize = batchSize;
// then we need to apply the predicate push down filter
ParquetMetadata footer = readFooter(conf, path, range(splitStart,
splitStart + splitLength));
MessageType fileSchema = footer.getFileMetaData().getSchema();
- FilterCompat.Filter filter = getFilter(conf);
+ FilterCompat.Filter filter = get(filterPredicate, recordFilter);
List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(),
fileSchema);
this.fileSchema = footer.getFileMetaData().getSchema();
diff --git
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e10f975bc29..3071ecc122d 100644
---
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.OriginalType;
@@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil {
int batchSize,
Path path,
long splitStart,
- long splitLength) throws IOException {
+ long splitLength,
+ FilterPredicate filterPredicate,
+ UnboundRecordFilter recordFilter) throws IOException {
List<String> selNonPartNames = Arrays.stream(selectedFields)
.mapToObj(i -> fullFieldNames[i])
.filter(n -> !partitionSpec.containsKey(n))
@@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil {
batchSize,
new org.apache.hadoop.fs.Path(path.toUri()),
splitStart,
- splitLength);
+ splitLength,
+ filterPredicate,
+ recordFilter);
}
private static ColumnVector createVector(
diff --git
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 1872ec385b4..65912cef671 100644
---
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -53,10 +55,10 @@ import java.util.stream.IntStream;
import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.FilterCompat.get;
import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
/**
* This reader is used to read a {@link VectorizedColumnBatch} from input
split.
@@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements
Closeable {
int batchSize,
Path path,
long splitStart,
- long splitLength) throws IOException {
+ long splitLength,
+ FilterPredicate filterPredicate,
+ UnboundRecordFilter recordFilter) throws IOException {
this.utcTimestamp = utcTimestamp;
this.batchSize = batchSize;
// then we need to apply the predicate push down filter
ParquetMetadata footer = readFooter(conf, path, range(splitStart,
splitStart + splitLength));
MessageType fileSchema = footer.getFileMetaData().getSchema();
- FilterCompat.Filter filter = getFilter(conf);
+ FilterCompat.Filter filter = get(filterPredicate, recordFilter);
List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(),
fileSchema);
this.fileSchema = footer.getFileMetaData().getSchema();
diff --git
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e10f975bc29..3071ecc122d 100644
---
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.OriginalType;
@@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil {
int batchSize,
Path path,
long splitStart,
- long splitLength) throws IOException {
+ long splitLength,
+ FilterPredicate filterPredicate,
+ UnboundRecordFilter recordFilter) throws IOException {
List<String> selNonPartNames = Arrays.stream(selectedFields)
.mapToObj(i -> fullFieldNames[i])
.filter(n -> !partitionSpec.containsKey(n))
@@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil {
batchSize,
new org.apache.hadoop.fs.Path(path.toUri()),
splitStart,
- splitLength);
+ splitLength,
+ filterPredicate,
+ recordFilter);
}
private static ColumnVector createVector(
diff --git
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 1872ec385b4..65912cef671 100644
---
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -53,10 +55,10 @@ import java.util.stream.IntStream;
import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.FilterCompat.get;
import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
/**
* This reader is used to read a {@link VectorizedColumnBatch} from input
split.
@@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements
Closeable {
int batchSize,
Path path,
long splitStart,
- long splitLength) throws IOException {
+ long splitLength,
+ FilterPredicate filterPredicate,
+ UnboundRecordFilter recordFilter) throws IOException {
this.utcTimestamp = utcTimestamp;
this.batchSize = batchSize;
// then we need to apply the predicate push down filter
ParquetMetadata footer = readFooter(conf, path, range(splitStart,
splitStart + splitLength));
MessageType fileSchema = footer.getFileMetaData().getSchema();
- FilterCompat.Filter filter = getFilter(conf);
+ FilterCompat.Filter filter = get(filterPredicate, recordFilter);
List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(),
fileSchema);
this.fileSchema = footer.getFileMetaData().getSchema();
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e10f975bc29..3071ecc122d 100644
---
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.OriginalType;
@@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil {
int batchSize,
Path path,
long splitStart,
- long splitLength) throws IOException {
+ long splitLength,
+ FilterPredicate filterPredicate,
+ UnboundRecordFilter recordFilter) throws IOException {
List<String> selNonPartNames = Arrays.stream(selectedFields)
.mapToObj(i -> fullFieldNames[i])
.filter(n -> !partitionSpec.containsKey(n))
@@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil {
batchSize,
new org.apache.hadoop.fs.Path(path.toUri()),
splitStart,
- splitLength);
+ splitLength,
+ filterPredicate,
+ recordFilter);
}
private static ColumnVector createVector(
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 1872ec385b4..65912cef671 100644
---
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -53,10 +55,10 @@ import java.util.stream.IntStream;
import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.FilterCompat.get;
import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
/**
* This reader is used to read a {@link VectorizedColumnBatch} from input
split.
@@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements
Closeable {
int batchSize,
Path path,
long splitStart,
- long splitLength) throws IOException {
+ long splitLength,
+ FilterPredicate filterPredicate,
+ UnboundRecordFilter recordFilter) throws IOException {
this.utcTimestamp = utcTimestamp;
this.batchSize = batchSize;
// then we need to apply the predicate push down filter
ParquetMetadata footer = readFooter(conf, path, range(splitStart,
splitStart + splitLength));
MessageType fileSchema = footer.getFileMetaData().getSchema();
- FilterCompat.Filter filter = getFilter(conf);
+ FilterCompat.Filter filter = get(filterPredicate, recordFilter);
List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(),
fileSchema);
this.fileSchema = footer.getFileMetaData().getSchema();