This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d4de459784940bd7f0443e051a3ff79c5d26c14c
Author: Nicholas Jiang <[email protected]>
AuthorDate: Fri Sep 1 09:36:45 2023 +0800

    [HUDI-6066] HoodieTableSource supports parquet predicate push down (#8437)
---
 .../apache/hudi/source/ExpressionPredicates.java   | 654 +++++++++++++++++++++
 .../org/apache/hudi/table/HoodieTableSource.java   |  18 +-
 .../apache/hudi/table/format/RecordIterators.java  |  60 +-
 .../hudi/table/format/cdc/CdcInputFormat.java      |  11 +-
 .../table/format/cow/CopyOnWriteInputFormat.java   |   9 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |  17 +-
 .../hudi/source/TestExpressionPredicates.java      | 167 ++++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  14 +
 .../apache/hudi/table/TestHoodieTableSource.java   |  23 +
 .../table/format/cow/ParquetSplitReaderUtil.java   |  10 +-
 .../reader/ParquetColumnarRowSplitReader.java      |  10 +-
 .../table/format/cow/ParquetSplitReaderUtil.java   |  10 +-
 .../reader/ParquetColumnarRowSplitReader.java      |  10 +-
 .../table/format/cow/ParquetSplitReaderUtil.java   |  10 +-
 .../reader/ParquetColumnarRowSplitReader.java      |  10 +-
 .../table/format/cow/ParquetSplitReaderUtil.java   |  10 +-
 .../reader/ParquetColumnarRowSplitReader.java      |  10 +-
 .../table/format/cow/ParquetSplitReaderUtil.java   |  10 +-
 .../reader/ParquetColumnarRowSplitReader.java      |  10 +-
 19 files changed, 1037 insertions(+), 36 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
new file mode 100644
index 00000000000..046e4b739ad
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.util.ExpressionUtils.getValueFromLiteral;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.io.api.Binary.fromConstantByteArray;
+import static org.apache.parquet.io.api.Binary.fromString;
+
+/**
+ * Tool to predicate the {@link 
org.apache.flink.table.expressions.ResolvedExpression}s.
+ */
+public class ExpressionPredicates {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpressionPredicates.class);
+
+  /**
+   * Converts specific call expression list to the predicate list.
+   *
+   * @param resolvedExpressions The resolved expressions to convert.
+   * @return The converted predicates.
+   */
+  public static List<Predicate> fromExpression(List<ResolvedExpression> 
resolvedExpressions) {
+    return resolvedExpressions.stream()
+        .map(e -> fromExpression((CallExpression) e))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Converts specific call expression to the predicate.
+   *
+   * <p>Two steps to bind the call:
+   * 1. map the predicate instance;
+   * 2. bind the field reference;
+   *
+   * <p>Normalize the expression to simplify the subsequent decision logic:
+   * always put the literal expression in the RHS.
+   *
+   * @param callExpression The call expression to convert.
+   * @return The converted predicate.
+   */
+  public static Predicate fromExpression(CallExpression callExpression) {
+    FunctionDefinition functionDefinition = 
callExpression.getFunctionDefinition();
+    List<Expression> childExpressions = callExpression.getChildren();
+
+    boolean normalized = childExpressions.get(0) instanceof 
FieldReferenceExpression;
+
+    if (BuiltInFunctionDefinitions.NOT.equals(functionDefinition)) {
+      Not predicate = Not.getInstance();
+      Predicate childPredicate = fromExpression((CallExpression) 
childExpressions.get(0));
+      return predicate.bindPredicate(childPredicate);
+    }
+
+    if (BuiltInFunctionDefinitions.AND.equals(functionDefinition)) {
+      And predicate = And.getInstance();
+      Predicate predicate1 = fromExpression((CallExpression) 
childExpressions.get(0));
+      Predicate predicate2 = fromExpression((CallExpression) 
childExpressions.get(1));
+      return predicate.bindPredicates(predicate1, predicate2);
+    }
+
+    if (BuiltInFunctionDefinitions.OR.equals(functionDefinition)) {
+      Or predicate = Or.getInstance();
+      Predicate predicate1 = fromExpression((CallExpression) 
childExpressions.get(0));
+      Predicate predicate2 = fromExpression((CallExpression) 
childExpressions.get(1));
+      return predicate.bindPredicates(predicate1, predicate2);
+    }
+
+    if (BuiltInFunctionDefinitions.IS_NULL.equals(functionDefinition)
+        || BuiltInFunctionDefinitions.IS_NOT_NULL.equals(functionDefinition)
+        || childExpressions.stream().anyMatch(e -> e instanceof 
ValueLiteralExpression
+        && getValueFromLiteral((ValueLiteralExpression) e) == null)) {
+      return AlwaysNull.getInstance();
+    }
+
+    // handle IN specifically
+    if (BuiltInFunctionDefinitions.IN.equals(functionDefinition)) {
+      checkState(normalized, "The IN expression expects to be normalized");
+      In in = In.getInstance();
+      FieldReferenceExpression fieldReference = (FieldReferenceExpression) 
childExpressions.get(0);
+      List<ValueLiteralExpression> valueLiterals = IntStream.range(1, 
childExpressions.size())
+          .mapToObj(index -> (ValueLiteralExpression) 
childExpressions.get(index))
+          .collect(Collectors.toList());
+      return 
in.bindValueLiterals(valueLiterals).bindFieldReference(fieldReference);
+    }
+
+    ColumnPredicate predicate;
+    // handle binary operators
+    if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+      predicate = Equals.getInstance();
+    } else if 
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+      predicate = NotEquals.getInstance();
+    } else if 
(BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+      predicate = normalized ? LessThan.getInstance() : 
GreaterThan.getInstance();
+    } else if 
(BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+      predicate = normalized ? GreaterThan.getInstance() : 
LessThan.getInstance();
+    } else if 
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+      predicate = normalized ? LessThanOrEqual.getInstance() : 
GreaterThanOrEqual.getInstance();
+    } else if 
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+      predicate = normalized ? GreaterThanOrEqual.getInstance() : 
LessThanOrEqual.getInstance();
+    } else {
+      throw new AssertionError("Unexpected function definition " + 
functionDefinition);
+    }
+    FieldReferenceExpression fieldReference = normalized
+        ? (FieldReferenceExpression) childExpressions.get(0)
+        : (FieldReferenceExpression) childExpressions.get(1);
+    ValueLiteralExpression valueLiteral = normalized
+        ? (ValueLiteralExpression) childExpressions.get(1)
+        : (ValueLiteralExpression) childExpressions.get(0);
+    return 
predicate.bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Classes to define predicates
+  // 
--------------------------------------------------------------------------------------------
+
+  /**
+   * A filter predicate that can be evaluated by the FileInputFormat.
+   */
+  public interface Predicate extends Serializable {
+
+    /**
+     * Predicates the criteria for which records to keep when loading data 
from a parquet file.
+     *
+     * @return A filter predicate of parquet file.
+     */
+    FilterPredicate filter();
+  }
+
+  /**
+   * Column predicate which depends on the given field.
+   */
+  public abstract static class ColumnPredicate implements Predicate {
+
+    // referenced field type
+    protected LogicalType literalType;
+
+    // referenced field name
+    protected String columnName;
+
+    // the constant literal value
+    protected Serializable literal;
+
+    /**
+     * Binds field reference to create a column predicate.
+     *
+     * @param fieldReference The field reference to negate.
+     * @return A column predicate.
+     */
+    public ColumnPredicate bindFieldReference(FieldReferenceExpression 
fieldReference) {
+      this.literalType = fieldReference.getOutputDataType().getLogicalType();
+      this.columnName = fieldReference.getName();
+      return this;
+    }
+
+    /**
+     * Binds value literal to create a column predicate.
+     *
+     * @param valueLiteral The value literal to negate.
+     * @return A column predicate.
+     */
+    public ColumnPredicate bindValueLiteral(ValueLiteralExpression 
valueLiteral) {
+      Object literalObject = getValueFromLiteral(valueLiteral);
+      // validate that literal is serializable
+      if (literalObject instanceof Serializable) {
+        this.literal = (Serializable) literalObject;
+      } else {
+        LOG.warn("Encountered a non-serializable literal. " + "Cannot push 
predicate with value literal [{}] into FileInputFormat. " + "This is a bug and 
should be reported.", valueLiteral);
+        this.literal = null;
+      }
+      return this;
+    }
+
+    @Override
+    public FilterPredicate filter() {
+      return toParquetPredicate(getFunctionDefinition(), literalType, 
columnName, literal);
+    }
+
+    /**
+     * Returns function definition of predicate.
+     *
+     * @return A function definition of predicate.
+     */
+    public FunctionDefinition getFunctionDefinition() {
+      return null;
+    }
+  }
+
+  /**
+   * An EQUALS predicate that can be evaluated by the FileInputFormat.
+   */
+  public static class Equals extends ColumnPredicate {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Returns a EQUALS predicate.
+     *
+     * @return A EQUALS predicate instance.
+     */
+    public static Equals getInstance() {
+      return new Equals();
+    }
+
+    @Override
+    public FunctionDefinition getFunctionDefinition() {
+      return BuiltInFunctionDefinitions.EQUALS;
+    }
+
+    @Override
+    public String toString() {
+      return columnName + " = " + literal;
+    }
+  }
+
+  /**
+   * A NOT_EQUALS predicate that can be evaluated by the FileInputFormat.
+   */
+  public static class NotEquals extends ColumnPredicate {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Returns a NOT_EQUALS predicate.
+     *
+     * @return A NOT_EQUALS predicate instance.
+     */
+    public static NotEquals getInstance() {
+      return new NotEquals();
+    }
+
+    @Override
+    public FunctionDefinition getFunctionDefinition() {
+      return BuiltInFunctionDefinitions.NOT_EQUALS;
+    }
+
+    @Override
+    public String toString() {
+      return columnName + " != " + literal;
+    }
+  }
+
+  /**
+   * A LESS_THAN predicate that can be evaluated by the FileInputFormat.
+   */
+  public static class LessThan extends ColumnPredicate {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Returns a LESS_THAN predicate.
+     *
+     * @return A LESS_THAN predicate instance.
+     */
+    public static LessThan getInstance() {
+      return new LessThan();
+    }
+
+    @Override
+    public FunctionDefinition getFunctionDefinition() {
+      return BuiltInFunctionDefinitions.LESS_THAN;
+    }
+
+    @Override
+    public String toString() {
+      return columnName + " < " + literal;
+    }
+  }
+
+  /**
+   * A GREATER_THAN predicate that can be evaluated by the FileInputFormat.
+   */
+  public static class GreaterThan extends ColumnPredicate {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Returns a GREATER_THAN predicate.
+     *
+     * @return A GREATER_THAN predicate instance.
+     */
+    public static GreaterThan getInstance() {
+      return new GreaterThan();
+    }
+
+    @Override
+    public FunctionDefinition getFunctionDefinition() {
+      return BuiltInFunctionDefinitions.GREATER_THAN;
+    }
+
+    @Override
+    public String toString() {
+      return columnName + " > " + literal;
+    }
+  }
+
+  /**
+   * A LESS_THAN_OR_EQUAL predicate that can be evaluated by the 
FileInputFormat.
+   */
+  public static class LessThanOrEqual extends ColumnPredicate {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Returns a LESS_THAN_OR_EQUAL predicate.
+     *
+     * @return A LESS_THAN_OR_EQUAL predicate instance.
+     */
+    public static LessThanOrEqual getInstance() {
+      return new LessThanOrEqual();
+    }
+
+    @Override
+    public FunctionDefinition getFunctionDefinition() {
+      return BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL;
+    }
+
+    @Override
+    public String toString() {
+      return columnName + " <= " + literal;
+    }
+  }
+
+  /**
+   * A GREATER_THAN_OR_EQUAL predicate that can be evaluated by the 
FileInputFormat.
+   */
+  public static class GreaterThanOrEqual extends ColumnPredicate {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Returns a GREATER_THAN_OR_EQUAL predicate.
+     *
+     * @return A GREATER_THAN_OR_EQUAL predicate instance.
+     */
+    public static GreaterThanOrEqual getInstance() {
+      return new GreaterThanOrEqual();
+    }
+
+    @Override
+    public FunctionDefinition getFunctionDefinition() {
+      return BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL;
+    }
+
+    @Override
+    public String toString() {
+      return columnName + " >= " + literal;
+    }
+  }
+
+  /**
+   * An IN predicate that can be evaluated by the FileInputFormat.
+   */
+  public static class In extends ColumnPredicate {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExpressionEvaluators.In.class);
+
+
+    private static final int IN_PREDICATE_LIMIT = 200;
+
+    // the constant literal values
+    protected List<Serializable> literals;
+
+    /**
+     * Returns an IN predicate.
+     *
+     * @return An IN predicate instance.
+     */
+    public static In getInstance() {
+      return new In();
+    }
+
+    /**
+     * Binds value literals to create an IN predicate.
+     *
+     * @param valueLiterals The value literals to negate.
+     * @return An IN predicate.
+     */
+    public ColumnPredicate bindValueLiterals(List<ValueLiteralExpression> 
valueLiterals) {
+      this.literals = valueLiterals.stream().map(valueLiteral -> {
+        Object literalObject = getValueFromLiteral(valueLiteral);
+        // validate that literal is serializable
+        if (literalObject instanceof Serializable) {
+          return (Serializable) literalObject;
+        } else {
+          LOG.warn("Encountered a non-serializable literal. " + "Cannot push 
predicate with value literal [{}] into FileInputFormat. " + "This is a bug and 
should be reported.", valueLiteral);
+          return null;
+        }
+      }).collect(Collectors.toList());
+      return this;
+    }
+
+    @Override
+    public FilterPredicate filter() {
+      if (literals.stream().anyMatch(Objects::isNull) || literals.size() > 
IN_PREDICATE_LIMIT) {
+        return null;
+      }
+
+      FilterPredicate filterPredicate = null;
+      for (Serializable literal : literals) {
+        FilterPredicate predicate = 
toParquetPredicate(BuiltInFunctionDefinitions.EQUALS, literalType, columnName, 
literal);
+        if (predicate != null) {
+          filterPredicate = filterPredicate == null ? predicate : 
or(filterPredicate, predicate);
+        }
+      }
+      return filterPredicate;
+    }
+
+    @Override
+    public String toString() {
+      return columnName + " IN(" + Arrays.toString(literals.toArray()) + ")";
+    }
+  }
+
+  /**
+   * A special predicate which is not possible to match any condition.
+   */
+  public static class AlwaysNull implements Predicate {
+
+    private static final long serialVersionUID = 1L;
+
+    public static AlwaysNull getInstance() {
+      return new AlwaysNull();
+    }
+
+    @Override
+    public FilterPredicate filter() {
+      return null;
+    }
+  }
+
+  /**
+   * A NOT predicate to negate a predicate that can be evaluated by the 
FileInputFormat.
+   */
+  public static class Not implements Predicate {
+
+    private static final long serialVersionUID = 1L;
+
+    private Predicate predicate;
+
+    /**
+     * Returns a NOT predicate.
+     */
+    public static Not getInstance() {
+      return new Not();
+    }
+
+    /**
+     * Binds predicate to create a NOT predicate.
+     *
+     * @param predicate The predicate to negate.
+     * @return A NOT predicate.
+     */
+    public Predicate bindPredicate(Predicate predicate) {
+      this.predicate = predicate;
+      return this;
+    }
+
+    @Override
+    public FilterPredicate filter() {
+      return not(predicate.filter());
+    }
+
+    @Override
+    public String toString() {
+      return "NOT(" + predicate.toString() + ")";
+    }
+  }
+
+  /**
+   * An AND predicate that can be evaluated by the FileInputFormat.
+   */
+  public static class And implements Predicate {
+
+    private static final long serialVersionUID = 1L;
+
+    private Predicate[] predicates;
+
+    /**
+     * Returns an AND predicate.
+     */
+    public static And getInstance() {
+      return new And();
+    }
+
+    /**
+     * Binds predicates to create an AND predicate.
+     *
+     * @param predicates The disjunctive predicates.
+     * @return An AND predicate.
+     */
+    public Predicate bindPredicates(Predicate... predicates) {
+      this.predicates = predicates;
+      return this;
+    }
+
+    @Override
+    public FilterPredicate filter() {
+      return and(predicates[0].filter(), predicates[1].filter());
+    }
+
+    @Override
+    public String toString() {
+      return "AND(" + Arrays.toString(predicates) + ")";
+    }
+  }
+
+  /**
+   * An OR predicate that can be evaluated by the FileInputFormat.
+   */
+  public static class Or implements Predicate {
+
+    private static final long serialVersionUID = 1L;
+
+    private Predicate[] predicates;
+
+    /**
+     * Returns an OR predicate.
+     */
+    public static Or getInstance() {
+      return new Or();
+    }
+
+    /**
+     * Binds predicates to create an OR predicate.
+     *
+     * @param predicates The disjunctive predicates.
+     * @return An OR predicate.
+     */
+    public Predicate bindPredicates(Predicate... predicates) {
+      this.predicates = predicates;
+      return this;
+    }
+
+    @Override
+    public FilterPredicate filter() {
+      return or(predicates[0].filter(), predicates[1].filter());
+    }
+
+    @Override
+    public String toString() {
+      return "OR(" + Arrays.toString(predicates) + ")";
+    }
+  }
+
+  private static FilterPredicate toParquetPredicate(FunctionDefinition 
functionDefinition, LogicalType literalType, String columnName, Serializable 
literal) {
+    switch (literalType.getTypeRoot()) {
+      case BOOLEAN:
+        return predicateSupportsEqNotEq(functionDefinition, 
booleanColumn(columnName), (Boolean) literal);
+      case TINYINT:
+      case SMALLINT:
+      case INTEGER:
+      case TIME_WITHOUT_TIME_ZONE:
+        return predicateSupportsLtGt(functionDefinition, 
intColumn(columnName), (Integer) literal);
+      case BIGINT:
+      case DATE:
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        return predicateSupportsLtGt(functionDefinition, 
longColumn(columnName), (Long) literal);
+      case FLOAT:
+        return predicateSupportsLtGt(functionDefinition, 
floatColumn(columnName), (Float) literal);
+      case DOUBLE:
+        return predicateSupportsLtGt(functionDefinition, 
doubleColumn(columnName), (Double) literal);
+      case BINARY:
+      case VARBINARY:
+        return predicateSupportsLtGt(functionDefinition, 
binaryColumn(columnName), fromConstantByteArray((byte[]) literal));
+      case CHAR:
+      case VARCHAR:
+        return predicateSupportsLtGt(functionDefinition, 
binaryColumn(columnName), fromString((String) literal));
+      default:
+        return null;
+    }
+  }
+
+  private static <T extends Comparable<T>, C extends Operators.Column<T> & 
Operators.SupportsEqNotEq> FilterPredicate predicateSupportsEqNotEq(
+      FunctionDefinition functionDefinition, C column, T value) {
+    if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+      return eq(column, value);
+    } else if 
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+      return notEq(column, value);
+    } else {
+      throw new AssertionError("Unexpected function definition " + 
functionDefinition);
+    }
+  }
+
+  private static <T extends Comparable<T>, C extends Operators.Column<T> & 
Operators.SupportsLtGt> FilterPredicate 
predicateSupportsLtGt(FunctionDefinition functionDefinition, C column, T value) 
{
+    if (BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+      return eq(column, value);
+    } else if 
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+      return notEq(column, value);
+    } else if 
(BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+      return lt(column, value);
+    } else if 
(BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+      return gt(column, value);
+    } else if 
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+      return ltEq(column, value);
+    } else if 
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+      return gtEq(column, value);
+    } else {
+      throw new AssertionError("Unexpected function definition " + 
functionDefinition);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 540f1a8c79d..03eb3205e8c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -40,6 +40,8 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.ExpressionPredicates;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.source.FileIndex;
 import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
@@ -134,6 +136,7 @@ public class HoodieTableSource implements
 
   private int[] requiredPos;
   private long limit;
+  private List<Predicate> predicates;
   private DataPruner dataPruner;
   private PartitionPruners.PartitionPruner partitionPruner;
   private int dataBucket;
@@ -145,7 +148,7 @@ public class HoodieTableSource implements
       List<String> partitionKeys,
       String defaultPartName,
       Configuration conf) {
-    this(schema, path, partitionKeys, defaultPartName, conf, null, null, 
PrimaryKeyPruners.BUCKET_ID_NO_PRUNING, null, null, null, null);
+    this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, 
PrimaryKeyPruners.BUCKET_ID_NO_PRUNING, null, null, null, null);
   }
 
   public HoodieTableSource(
@@ -154,6 +157,7 @@ public class HoodieTableSource implements
       List<String> partitionKeys,
       String defaultPartName,
       Configuration conf,
+      @Nullable List<Predicate> predicates,
       @Nullable DataPruner dataPruner,
       @Nullable PartitionPruners.PartitionPruner partitionPruner,
       int dataBucket,
@@ -167,6 +171,7 @@ public class HoodieTableSource implements
     this.partitionKeys = partitionKeys;
     this.defaultPartName = defaultPartName;
     this.conf = conf;
+    this.predicates = predicates == null ? Collections.emptyList() : 
predicates;
     this.dataPruner = dataPruner;
     this.partitionPruner = partitionPruner;
     this.dataBucket = dataBucket;
@@ -230,7 +235,7 @@ public class HoodieTableSource implements
   @Override
   public DynamicTableSource copy() {
     return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
-        conf, dataPruner, partitionPruner, dataBucket, requiredPos, limit, 
metaClient, internalSchemaManager);
+        conf, predicates, dataPruner, partitionPruner, dataBucket, 
requiredPos, limit, metaClient, internalSchemaManager);
   }
 
   @Override
@@ -242,6 +247,7 @@ public class HoodieTableSource implements
   public Result applyFilters(List<ResolvedExpression> filters) {
     List<ResolvedExpression> simpleFilters = 
filterSimpleCallExpression(filters);
     Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitFilters = 
splitExprByPartitionCall(simpleFilters, this.partitionKeys, this.tableRowType);
+    this.predicates = ExpressionPredicates.fromExpression(splitFilters.f0);
     this.dataPruner = DataPruner.newInstance(splitFilters.f0);
     this.partitionPruner = cratePartitionPruner(splitFilters.f1);
     this.dataBucket = getDataBucket(splitFilters.f0);
@@ -474,6 +480,7 @@ public class HoodieTableSource implements
         // is not very stable.
         .fieldTypes(rowDataType.getChildren())
         .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+        .predicates(this.predicates)
         .limit(this.limit)
         .emitDelete(false) // the change logs iterator can handle the DELETE 
records
         .build();
@@ -500,6 +507,7 @@ public class HoodieTableSource implements
         // is not very stable.
         .fieldTypes(rowDataType.getChildren())
         .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+        .predicates(this.predicates)
         .limit(this.limit)
         .emitDelete(emitDelete)
         .internalSchemaManager(internalSchemaManager)
@@ -530,6 +538,7 @@ public class HoodieTableSource implements
         this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
         this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD),
         this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
+        this.predicates,
         this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // 
ParquetInputFormat always uses the limit value
         getParquetConf(this.conf, this.hadoopConf),
         this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
@@ -600,6 +609,11 @@ public class HoodieTableSource implements
     return fileIndex.getFilesInPartitions();
   }
 
+  @VisibleForTesting
+  public List<Predicate> getPredicates() {
+    return predicates;
+  }
+
   @VisibleForTesting
   public DataPruner getDataPruner() {
     return dataPruner;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index b6be67df55a..711ed446713 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -21,17 +21,29 @@ package org.apache.hudi.table.format;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
 import org.apache.hudi.util.RowDataProjection;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.SerializationUtil;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.hadoop.ParquetInputFormat.FILTER_PREDICATE;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.UNBOUND_RECORD_FILTER;
+
 /**
  * Factory clazz for record iterators.
  */
@@ -49,7 +61,17 @@ public abstract class RecordIterators {
       int batchSize,
       Path path,
       long splitStart,
-      long splitLength) throws IOException {
+      long splitLength,
+      List<Predicate> predicates) throws IOException {
+    FilterPredicate filterPredicate = getFilterPredicate(conf);
+    for (Predicate predicate : predicates) {
+      FilterPredicate filter = predicate.filter();
+      if (filter != null) {
+        filterPredicate = filterPredicate == null ? filter : 
and(filterPredicate, filter);
+      }
+    }
+    UnboundRecordFilter recordFilter = getUnboundRecordFilterInstance(conf);
+
     InternalSchema mergeSchema = 
internalSchemaManager.getMergeSchema(path.getName());
     if (mergeSchema.isEmptySchema()) {
       return new ParquetSplitRecordIterator(
@@ -64,7 +86,9 @@ public abstract class RecordIterators {
               batchSize,
               path,
               splitStart,
-              splitLength));
+              splitLength,
+              filterPredicate,
+              recordFilter));
     } else {
       CastMap castMap = internalSchemaManager.getCastMap(mergeSchema, 
fieldNames, fieldTypes, selectedFields);
       Option<RowDataProjection> castProjection = 
castMap.toRowDataProjection(selectedFields);
@@ -80,7 +104,9 @@ public abstract class RecordIterators {
               batchSize,
               path,
               splitStart,
-              splitLength));
+              splitLength,
+              filterPredicate,
+              recordFilter));
       if (castProjection.isPresent()) {
         return new SchemaEvolvedRecordIterator(itr, castProjection.get());
       } else {
@@ -88,4 +114,32 @@ public abstract class RecordIterators {
       }
     }
   }
+
+  private static FilterPredicate getFilterPredicate(Configuration 
configuration) {
+    try {
+      return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE, 
configuration);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static UnboundRecordFilter 
getUnboundRecordFilterInstance(Configuration configuration) {
+    Class<?> clazz = ConfigurationUtil.getClassFromConfig(configuration, 
UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
+    if (clazz == null) {
+      return null;
+    }
+
+    try {
+      UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter) 
clazz.newInstance();
+
+      if (unboundRecordFilter instanceof Configurable) {
+        ((Configurable) unboundRecordFilter).setConf(configuration);
+      }
+
+      return unboundRecordFilter;
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new BadConfigurationException(
+          "could not instantiate unbound record filter class", e);
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 124f8482b6f..154df81a0d4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
@@ -88,9 +89,10 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
       MergeOnReadTableState tableState,
       List<DataType> fieldTypes,
       String defaultPartName,
+      List<Predicate> predicates,
       long limit,
       boolean emitDelete) {
-    super(conf, tableState, fieldTypes, defaultPartName, limit, emitDelete, 
InternalSchemaManager.DISABLED);
+    super(conf, tableState, fieldTypes, defaultPartName, predicates, limit, 
emitDelete, InternalSchemaManager.DISABLED);
   }
 
   @Override
@@ -701,6 +703,11 @@ public class CdcInputFormat extends MergeOnReadInputFormat 
{
       return this;
     }
 
+    public Builder predicates(List<Predicate> predicates) {
+      this.predicates = predicates;
+      return this;
+    }
+
     public Builder limit(long limit) {
       this.limit = limit;
       return this;
@@ -713,7 +720,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
 
     public CdcInputFormat build() {
       return new CdcInputFormat(conf, tableState, fieldTypes,
-          defaultPartName, limit, emitDelete);
+          defaultPartName, predicates, limit, emitDelete);
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index ec9b0b02a7b..5b365a58990 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -18,9 +18,9 @@
 
 package org.apache.hudi.table.format.cow;
 
-import java.util.Comparator;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.table.format.RecordIterators;
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -75,6 +76,7 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
   private final boolean hiveStylePartitioning;
   private final boolean utcTimestamp;
   private final SerializableConfiguration conf;
+  private final List<Predicate> predicates;
   private final long limit;
 
   private transient ClosableIterator<RowData> itr;
@@ -95,11 +97,13 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
       String partDefaultName,
       String partPathField,
       boolean hiveStylePartitioning,
+      List<Predicate> predicates,
       long limit,
       Configuration conf,
       boolean utcTimestamp,
       InternalSchemaManager internalSchemaManager) {
     super.setFilePaths(paths);
+    this.predicates = predicates;
     this.limit = limit;
     this.partDefaultName = partDefaultName;
     this.partPathField = partPathField;
@@ -135,7 +139,8 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
         2048,
         fileSplit.getPath(),
         fileSplit.getStart(),
-        fileSplit.getLength());
+        fileSplit.getLength(),
+        predicates);
     this.currentReadCount = 0L;
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 23a3934aeb9..f13098fc7c7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.table.format.InternalSchemaManager;
@@ -123,6 +124,9 @@ public class MergeOnReadInputFormat
    */
   private final int[] requiredPos;
 
+  // for predicate push down
+  private final List<Predicate> predicates;
+
   // for limit push down
   /**
    * Limit for the reader, -1 when the reading is not limited.
@@ -152,6 +156,7 @@ public class MergeOnReadInputFormat
       MergeOnReadTableState tableState,
       List<DataType> fieldTypes,
       String defaultPartName,
+      List<Predicate> predicates,
       long limit,
       boolean emitDelete,
       InternalSchemaManager internalSchemaManager) {
@@ -163,6 +168,7 @@ public class MergeOnReadInputFormat
     // Needs improvement: this requiredPos is only suitable for parquet reader,
     // because we need to
     this.requiredPos = tableState.getRequiredPositions();
+    this.predicates = predicates;
     this.limit = limit;
     this.emitDelete = emitDelete;
     this.internalSchemaManager = internalSchemaManager;
@@ -336,7 +342,8 @@ public class MergeOnReadInputFormat
         2048,
         new org.apache.flink.core.fs.Path(path),
         0,
-        Long.MAX_VALUE); // read the whole file
+        Long.MAX_VALUE, // read the whole file
+        predicates);
   }
 
   private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit 
split) {
@@ -845,6 +852,7 @@ public class MergeOnReadInputFormat
     protected MergeOnReadTableState tableState;
     protected List<DataType> fieldTypes;
     protected String defaultPartName;
+    protected List<Predicate> predicates;
     protected long limit = -1;
     protected boolean emitDelete = false;
     protected InternalSchemaManager internalSchemaManager = 
InternalSchemaManager.DISABLED;
@@ -869,6 +877,11 @@ public class MergeOnReadInputFormat
       return this;
     }
 
+    public Builder predicates(List<Predicate> predicates) {
+      this.predicates = predicates;
+      return this;
+    }
+
     public Builder limit(long limit) {
       this.limit = limit;
       return this;
@@ -886,7 +899,7 @@ public class MergeOnReadInputFormat
 
     public MergeOnReadInputFormat build() {
       return new MergeOnReadInputFormat(conf, tableState, fieldTypes,
-          defaultPartName, limit, emitDelete, internalSchemaManager);
+          defaultPartName, predicates, limit, emitDelete, 
internalSchemaManager);
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
new file mode 100644
index 00000000000..97b06644266
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.source.ExpressionPredicates.And;
+import org.apache.hudi.source.ExpressionPredicates.Equals;
+import org.apache.hudi.source.ExpressionPredicates.GreaterThan;
+import org.apache.hudi.source.ExpressionPredicates.GreaterThanOrEqual;
+import org.apache.hudi.source.ExpressionPredicates.In;
+import org.apache.hudi.source.ExpressionPredicates.LessThan;
+import org.apache.hudi.source.ExpressionPredicates.LessThanOrEqual;
+import org.apache.hudi.source.ExpressionPredicates.Not;
+import org.apache.hudi.source.ExpressionPredicates.NotEquals;
+import org.apache.hudi.source.ExpressionPredicates.Or;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hudi.source.ExpressionPredicates.fromExpression;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test cases for {@link ExpressionPredicates}.
+ */
+public class TestExpressionPredicates {
+
+  @Test
+  public void testFilterPredicateFromExpression() {
+    FieldReferenceExpression fieldReference = new 
FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0);
+    ValueLiteralExpression valueLiteral = new ValueLiteralExpression(10);
+    List<ResolvedExpression> expressions = Arrays.asList(fieldReference, 
valueLiteral);
+    IntColumn intColumn = intColumn("f_int");
+
+    // equals
+    CallExpression equalsExpression = new CallExpression(
+        BuiltInFunctionDefinitions.EQUALS, expressions, DataTypes.BOOLEAN());
+    Predicate predicate1 = 
Equals.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+    Eq<Integer> eq = eq(intColumn, 10);
+    Predicate predicate2 = fromExpression(equalsExpression);
+    assertEquals(predicate1.toString(), predicate2.toString());
+    assertEquals(eq, predicate2.filter());
+
+    // not equals
+    CallExpression notEqualsExpression = new CallExpression(
+        BuiltInFunctionDefinitions.NOT_EQUALS, expressions, 
DataTypes.BOOLEAN());
+    Predicate predicate3 = 
NotEquals.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+    Predicate predicate4 = fromExpression(notEqualsExpression);
+    assertEquals(predicate3.toString(), predicate4.toString());
+    assertEquals(notEq(intColumn, 10), predicate4.filter());
+
+    // less than
+    CallExpression lessThanExpression = new CallExpression(
+        BuiltInFunctionDefinitions.LESS_THAN, expressions, 
DataTypes.BOOLEAN());
+    Predicate predicate5 = 
LessThan.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+    Lt<Integer> lt = lt(intColumn, 10);
+    Predicate predicate6 = fromExpression(lessThanExpression);
+    assertEquals(predicate5.toString(), predicate6.toString());
+    assertEquals(lt, predicate6.filter());
+
+    // greater than
+    CallExpression greaterThanExpression = new CallExpression(
+        BuiltInFunctionDefinitions.GREATER_THAN, expressions, 
DataTypes.BOOLEAN());
+    Predicate predicate7 = 
GreaterThan.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+    Gt<Integer> gt = gt(intColumn, 10);
+    Predicate predicate8 = fromExpression(greaterThanExpression);
+    assertEquals(predicate7.toString(), predicate8.toString());
+    assertEquals(gt, predicate8.filter());
+
+    // less than or equal
+    CallExpression lessThanOrEqualExpression = new CallExpression(
+        BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, expressions, 
DataTypes.BOOLEAN());
+    Predicate predicate9 = 
LessThanOrEqual.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+    Predicate predicate10 = fromExpression(lessThanOrEqualExpression);
+    assertEquals(predicate9.toString(), predicate10.toString());
+    assertEquals(ltEq(intColumn, 10), predicate10.filter());
+
+    // greater than or equal
+    CallExpression greaterThanOrEqualExpression = new CallExpression(
+        BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, expressions, 
DataTypes.BOOLEAN());
+    Predicate predicate11 = 
GreaterThanOrEqual.getInstance().bindValueLiteral(valueLiteral).bindFieldReference(fieldReference);
+    Predicate predicate12 = fromExpression(greaterThanOrEqualExpression);
+    assertEquals(predicate11.toString(), predicate12.toString());
+    assertEquals(gtEq(intColumn, 10), predicate12.filter());
+
+    // in
+    ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(11);
+    ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression(12);
+    CallExpression inExpression = new CallExpression(
+        BuiltInFunctionDefinitions.IN,
+        Arrays.asList(fieldReference, valueLiteral1, valueLiteral2),
+        DataTypes.BOOLEAN());
+    Predicate predicate13 = 
In.getInstance().bindValueLiterals(Arrays.asList(valueLiteral1, 
valueLiteral2)).bindFieldReference(fieldReference);
+    Predicate predicate14 = fromExpression(inExpression);
+    assertEquals(predicate13.toString(), predicate14.toString());
+    assertEquals(or(eq(intColumn, 11), eq(intColumn, 12)), 
predicate14.filter());
+
+    // not
+    CallExpression notExpression = new CallExpression(
+        BuiltInFunctionDefinitions.NOT,
+        Collections.singletonList(equalsExpression),
+        DataTypes.BOOLEAN());
+    Predicate predicate15 = Not.getInstance().bindPredicate(predicate2);
+    Predicate predicate16 = fromExpression(notExpression);
+    assertEquals(predicate15.toString(), predicate16.toString());
+    assertEquals(not(eq), predicate16.filter());
+
+    // and
+    CallExpression andExpression = new CallExpression(
+        BuiltInFunctionDefinitions.AND,
+        Arrays.asList(lessThanExpression, greaterThanExpression),
+        DataTypes.BOOLEAN());
+    Predicate predicate17 = And.getInstance().bindPredicates(predicate6, 
predicate8);
+    Predicate predicate18 = fromExpression(andExpression);
+    assertEquals(predicate17.toString(), predicate18.toString());
+    assertEquals(and(lt, gt), predicate18.filter());
+
+    // or
+    CallExpression orExpression = new CallExpression(
+        BuiltInFunctionDefinitions.OR,
+        Arrays.asList(lessThanExpression, greaterThanExpression),
+        DataTypes.BOOLEAN());
+    Predicate predicate19 = Or.getInstance().bindPredicates(predicate6, 
predicate8);
+    Predicate predicate20 = fromExpression(orExpression);
+    assertEquals(predicate19.toString(), predicate20.toString());
+    assertEquals(or(lt, gt), predicate20.filter());
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 4ea92fbb845..40fb28619de 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -2036,6 +2036,20 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result4, expected4);
   }
 
+  @Test
+  void testReadWithParquetPredicatePushDown() {
+    TableEnvironment tableEnv = batchTableEnv;
+    String hoodieTableDDL = sql("t1").option(FlinkOptions.PATH, 
tempFile.getAbsolutePath()).end();
+    tableEnv.executeSql(hoodieTableDDL);
+    execInsertSql(tableEnv, TestSQL.INSERT_T1);
+    // apply filters to push down predicates
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1 where uuid > 'id2' and age > 
30 and ts > '1970-01-01 00:00:04'").execute().collect());
+    assertRowsEquals(result, "["
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 2716dee2b1b..d0201620219 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.source.prune.DataPruner;
 import org.apache.hudi.source.prune.PrimaryKeyPruners;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
@@ -55,6 +56,7 @@ import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -65,6 +67,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
@@ -291,6 +294,26 @@ public class TestHoodieTableSource {
     assertThat(metaClient, is(tableSourceCopy.getMetaClient()));
   }
 
+  @Test
+  void testFilterPushDownWithParquetPredicates() {
+    HoodieTableSource tableSource = getEmptyStreamingSource();
+    List<ResolvedExpression> expressions = new ArrayList<>();
+    expressions.add(new FieldReferenceExpression("f_int", DataTypes.INT(), 0, 
0));
+    expressions.add(new ValueLiteralExpression(10));
+    ResolvedExpression equalsExpression = new CallExpression(
+        BuiltInFunctionDefinitions.EQUALS, expressions, DataTypes.BOOLEAN());
+    CallExpression greaterThanExpression = new CallExpression(
+        BuiltInFunctionDefinitions.GREATER_THAN, expressions, 
DataTypes.BOOLEAN());
+    CallExpression orExpression = new CallExpression(
+        BuiltInFunctionDefinitions.OR,
+        Arrays.asList(equalsExpression, greaterThanExpression),
+        DataTypes.BOOLEAN());
+    List<ResolvedExpression> expectedFilters = Arrays.asList(equalsExpression, 
greaterThanExpression, orExpression);
+    tableSource.applyFilters(expectedFilters);
+    String actualPredicates = tableSource.getPredicates().toString();
+    
assertEquals(ExpressionPredicates.fromExpression(expectedFilters).toString(), 
actualPredicates);
+  }
+
   private HoodieTableSource getEmptyStreamingSource() {
     final String path = tempFile.getAbsolutePath();
     conf = TestConfigurations.getDefaultConf(path);
diff --git 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 4a9675d746a..622f499b64b 100644
--- 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.OriginalType;
@@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil {
       int batchSize,
       Path path,
       long splitStart,
-      long splitLength) throws IOException {
+      long splitLength,
+      FilterPredicate filterPredicate,
+      UnboundRecordFilter recordFilter) throws IOException {
     List<String> selNonPartNames = Arrays.stream(selectedFields)
         .mapToObj(i -> fullFieldNames[i])
         .filter(n -> !partitionSpec.containsKey(n))
@@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil {
         batchSize,
         new org.apache.hadoop.fs.Path(path.toUri()),
         splitStart,
-        splitLength);
+        splitLength,
+        filterPredicate,
+        recordFilter);
   }
 
   private static ColumnVector createVector(
diff --git 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 6922ada9acf..9436305d295 100644
--- 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -53,10 +55,10 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
 import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.FilterCompat.get;
 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
 import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
 import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 /**
  * This reader is used to read a {@link VectorizedColumnBatch} from input 
split.
@@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
       int batchSize,
       Path path,
       long splitStart,
-      long splitLength) throws IOException {
+      long splitLength,
+      FilterPredicate filterPredicate,
+      UnboundRecordFilter recordFilter) throws IOException {
     this.utcTimestamp = utcTimestamp;
     this.batchSize = batchSize;
     // then we need to apply the predicate push down filter
     ParquetMetadata footer = readFooter(conf, path, range(splitStart, 
splitStart + splitLength));
     MessageType fileSchema = footer.getFileMetaData().getSchema();
-    FilterCompat.Filter filter = getFilter(conf);
+    FilterCompat.Filter filter = get(filterPredicate, recordFilter);
     List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), 
fileSchema);
 
     this.fileSchema = footer.getFileMetaData().getSchema();
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index a7bd063c746..7e611a5e2cb 100644
--- 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.OriginalType;
@@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil {
       int batchSize,
       Path path,
       long splitStart,
-      long splitLength) throws IOException {
+      long splitLength,
+      FilterPredicate filterPredicate,
+      UnboundRecordFilter recordFilter) throws IOException {
     List<String> selNonPartNames = Arrays.stream(selectedFields)
         .mapToObj(i -> fullFieldNames[i])
         .filter(n -> !partitionSpec.containsKey(n))
@@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil {
         batchSize,
         new org.apache.hadoop.fs.Path(path.toUri()),
         splitStart,
-        splitLength);
+        splitLength,
+        filterPredicate,
+        recordFilter);
   }
 
   private static ColumnVector createVector(
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 1826d5bea4c..4eb91988403 100644
--- 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -53,10 +55,10 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
 import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.FilterCompat.get;
 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
 import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
 import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 /**
  * This reader is used to read a {@link VectorizedColumnBatch} from input 
split.
@@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
       int batchSize,
       Path path,
       long splitStart,
-      long splitLength) throws IOException {
+      long splitLength,
+      FilterPredicate filterPredicate,
+      UnboundRecordFilter recordFilter) throws IOException {
     this.utcTimestamp = utcTimestamp;
     this.batchSize = batchSize;
     // then we need to apply the predicate push down filter
     ParquetMetadata footer = readFooter(conf, path, range(splitStart, 
splitStart + splitLength));
     MessageType fileSchema = footer.getFileMetaData().getSchema();
-    FilterCompat.Filter filter = getFilter(conf);
+    FilterCompat.Filter filter = get(filterPredicate, recordFilter);
     List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), 
fileSchema);
 
     this.fileSchema = footer.getFileMetaData().getSchema();
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e10f975bc29..3071ecc122d 100644
--- 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.OriginalType;
@@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil {
       int batchSize,
       Path path,
       long splitStart,
-      long splitLength) throws IOException {
+      long splitLength,
+      FilterPredicate filterPredicate,
+      UnboundRecordFilter recordFilter) throws IOException {
     List<String> selNonPartNames = Arrays.stream(selectedFields)
         .mapToObj(i -> fullFieldNames[i])
         .filter(n -> !partitionSpec.containsKey(n))
@@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil {
         batchSize,
         new org.apache.hadoop.fs.Path(path.toUri()),
         splitStart,
-        splitLength);
+        splitLength,
+        filterPredicate,
+        recordFilter);
   }
 
   private static ColumnVector createVector(
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 1872ec385b4..65912cef671 100644
--- 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -53,10 +55,10 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
 import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.FilterCompat.get;
 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
 import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
 import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 /**
  * This reader is used to read a {@link VectorizedColumnBatch} from input 
split.
@@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
       int batchSize,
       Path path,
       long splitStart,
-      long splitLength) throws IOException {
+      long splitLength,
+      FilterPredicate filterPredicate,
+      UnboundRecordFilter recordFilter) throws IOException {
     this.utcTimestamp = utcTimestamp;
     this.batchSize = batchSize;
     // then we need to apply the predicate push down filter
     ParquetMetadata footer = readFooter(conf, path, range(splitStart, 
splitStart + splitLength));
     MessageType fileSchema = footer.getFileMetaData().getSchema();
-    FilterCompat.Filter filter = getFilter(conf);
+    FilterCompat.Filter filter = get(filterPredicate, recordFilter);
     List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), 
fileSchema);
 
     this.fileSchema = footer.getFileMetaData().getSchema();
diff --git 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e10f975bc29..3071ecc122d 100644
--- 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.OriginalType;
@@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil {
       int batchSize,
       Path path,
       long splitStart,
-      long splitLength) throws IOException {
+      long splitLength,
+      FilterPredicate filterPredicate,
+      UnboundRecordFilter recordFilter) throws IOException {
     List<String> selNonPartNames = Arrays.stream(selectedFields)
         .mapToObj(i -> fullFieldNames[i])
         .filter(n -> !partitionSpec.containsKey(n))
@@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil {
         batchSize,
         new org.apache.hadoop.fs.Path(path.toUri()),
         splitStart,
-        splitLength);
+        splitLength,
+        filterPredicate,
+        recordFilter);
   }
 
   private static ColumnVector createVector(
diff --git 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 1872ec385b4..65912cef671 100644
--- 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -53,10 +55,10 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
 import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.FilterCompat.get;
 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
 import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
 import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 /**
  * This reader is used to read a {@link VectorizedColumnBatch} from input 
split.
@@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
       int batchSize,
       Path path,
       long splitStart,
-      long splitLength) throws IOException {
+      long splitLength,
+      FilterPredicate filterPredicate,
+      UnboundRecordFilter recordFilter) throws IOException {
     this.utcTimestamp = utcTimestamp;
     this.batchSize = batchSize;
     // then we need to apply the predicate push down filter
     ParquetMetadata footer = readFooter(conf, path, range(splitStart, 
splitStart + splitLength));
     MessageType fileSchema = footer.getFileMetaData().getSchema();
-    FilterCompat.Filter filter = getFilter(conf);
+    FilterCompat.Filter filter = get(filterPredicate, recordFilter);
     List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), 
fileSchema);
 
     this.fileSchema = footer.getFileMetaData().getSchema();
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e10f975bc29..3071ecc122d 100644
--- 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -72,6 +72,8 @@ import org.apache.parquet.ParquetRuntimeException;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.OriginalType;
@@ -115,7 +117,9 @@ public class ParquetSplitReaderUtil {
       int batchSize,
       Path path,
       long splitStart,
-      long splitLength) throws IOException {
+      long splitLength,
+      FilterPredicate filterPredicate,
+      UnboundRecordFilter recordFilter) throws IOException {
     List<String> selNonPartNames = Arrays.stream(selectedFields)
         .mapToObj(i -> fullFieldNames[i])
         .filter(n -> !partitionSpec.containsKey(n))
@@ -148,7 +152,9 @@ public class ParquetSplitReaderUtil {
         batchSize,
         new org.apache.hadoop.fs.Path(path.toUri()),
         splitStart,
-        splitLength);
+        splitLength,
+        filterPredicate,
+        recordFilter);
   }
 
   private static ColumnVector createVector(
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 1872ec385b4..65912cef671 100644
--- 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -53,10 +55,10 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
 import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.FilterCompat.get;
 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
 import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
 import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 /**
  * This reader is used to read a {@link VectorizedColumnBatch} from input 
split.
@@ -123,13 +125,15 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
       int batchSize,
       Path path,
       long splitStart,
-      long splitLength) throws IOException {
+      long splitLength,
+      FilterPredicate filterPredicate,
+      UnboundRecordFilter recordFilter) throws IOException {
     this.utcTimestamp = utcTimestamp;
     this.batchSize = batchSize;
     // then we need to apply the predicate push down filter
     ParquetMetadata footer = readFooter(conf, path, range(splitStart, 
splitStart + splitLength));
     MessageType fileSchema = footer.getFileMetaData().getSchema();
-    FilterCompat.Filter filter = getFilter(conf);
+    FilterCompat.Filter filter = get(filterPredicate, recordFilter);
     List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), 
fileSchema);
 
     this.fileSchema = footer.getFileMetaData().getSchema();

Reply via email to