This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 79428391bac [HUDI-5851] Improvement of data skipping, only converts 
expressions to evaluators once (#8051)
79428391bac is described below

commit 79428391bac7277ffa9e18c75594a6fb9b8c5665
Author: Jing Zhang <[email protected]>
AuthorDate: Fri Mar 10 14:53:16 2023 +0800

    [HUDI-5851] Improvement of data skipping, only converts expressions to 
evaluators once (#8051)
    
    * Add log to FileIndex about the data skipping info
    * Move all evaluators and relative utility in one class
---
 .../java/org/apache/hudi/source/DataPruner.java    | 140 +++++
 .../apache/hudi/source/ExpressionEvaluators.java   | 576 ++++++++++++++++++++
 .../java/org/apache/hudi/source/FileIndex.java     |  46 +-
 .../org/apache/hudi/source/stats/ColumnStats.java  |  72 +++
 .../hudi/source/stats/ExpressionEvaluator.java     | 605 ---------------------
 .../hudi/source/TestExpressionEvaluators.java      | 408 ++++++++++++++
 .../hudi/source/stats/TestExpressionEvaluator.java | 403 --------------
 .../apache/hudi/table/ITTestHoodieDataSource.java  |   7 +
 8 files changed, 1230 insertions(+), 1027 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/DataPruner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/DataPruner.java
new file mode 100644
index 00000000000..605fcdf7fb0
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/DataPruner.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.util.ExpressionUtils;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.source.ExpressionEvaluators.fromExpression;
+
+/**
+ * Utility to do data skipping.
+ */
+public class DataPruner implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final String[] referencedCols;
+  private final List<ExpressionEvaluators.Evaluator> evaluators;
+
+  private DataPruner(String[] referencedCols, 
List<ExpressionEvaluators.Evaluator> evaluators) {
+    this.referencedCols = referencedCols;
+    this.evaluators = evaluators;
+  }
+
+  /**
+   * Filters the index row with specific data filters and query fields.
+   *
+   * @param indexRow    The index row
+   * @param queryFields The query fields referenced by the filters
+   * @return true if the index row should be considered as a candidate
+   */
+  public boolean test(RowData indexRow, RowType.RowField[] queryFields) {
+    Map<String, ColumnStats> columnStatsMap = convertColumnStats(indexRow, 
queryFields);
+    for (ExpressionEvaluators.Evaluator evaluator : evaluators) {
+      if (!evaluator.eval(columnStatsMap)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public String[] getReferencedCols() {
+    return referencedCols;
+  }
+
+  public static DataPruner newInstance(List<ResolvedExpression> filters) {
+    if (filters == null || filters.size() == 0) {
+      return null;
+    }
+    String[] referencedCols = ExpressionUtils.referencedColumns(filters);
+    if (referencedCols.length == 0) {
+      return null;
+    }
+    List<ExpressionEvaluators.Evaluator> evaluators = fromExpression(filters);
+    return new DataPruner(referencedCols, evaluators);
+  }
+
+  public static Map<String, ColumnStats> convertColumnStats(RowData indexRow, 
RowType.RowField[] queryFields) {
+    if (indexRow == null || queryFields == null) {
+      throw new IllegalArgumentException("Index Row and query fields could not 
be null.");
+    }
+    Map<String, ColumnStats> mapping = new LinkedHashMap<>();
+    for (int i = 0; i < queryFields.length; i++) {
+      String name = queryFields[i].getName();
+      int startPos = 2 + i * 3;
+      LogicalType colType = queryFields[i].getType();
+      Object minVal = indexRow.isNullAt(startPos) ? null : 
getValAsJavaObj(indexRow, startPos, colType);
+      Object maxVal = indexRow.isNullAt(startPos + 1) ? null : 
getValAsJavaObj(indexRow, startPos + 1, colType);
+      long nullCnt = indexRow.getLong(startPos + 2);
+      mapping.put(name, new ColumnStats(minVal, maxVal, nullCnt));
+    }
+    return mapping;
+  }
+
+  /**
+   * Returns the value as Java object at position {@code pos} of row {@code 
indexRow}.
+   */
+  private static Object getValAsJavaObj(RowData indexRow, int pos, LogicalType 
colType) {
+    switch (colType.getTypeRoot()) {
+      // NOTE: Since we can't rely on Avro's "date", and "timestamp-micros" 
logical-types, we're
+      //       manually encoding corresponding values as int and long w/in the 
Column Stats Index and
+      //       here we have to decode those back into corresponding logical 
representation.
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        TimestampType tsType = (TimestampType) colType;
+        return indexRow.getTimestamp(pos, 
tsType.getPrecision()).getMillisecond();
+      case TIME_WITHOUT_TIME_ZONE:
+      case DATE:
+        return indexRow.getLong(pos);
+      // NOTE: All integral types of size less than Int are encoded as Ints in 
MT
+      case BOOLEAN:
+        return indexRow.getBoolean(pos);
+      case TINYINT:
+      case SMALLINT:
+      case INTEGER:
+        return indexRow.getInt(pos);
+      case FLOAT:
+        return indexRow.getFloat(pos);
+      case DOUBLE:
+        return indexRow.getDouble(pos);
+      case BINARY:
+      case VARBINARY:
+        return indexRow.getBinary(pos);
+      case CHAR:
+      case VARCHAR:
+        return indexRow.getString(pos).toString();
+      case DECIMAL:
+        DecimalType decimalType = (DecimalType) colType;
+        return indexRow.getDecimal(pos, decimalType.getPrecision(), 
decimalType.getScale()).toBigDecimal();
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + 
colType);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
new file mode 100644
index 00000000000..a0cf9b1d89c
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.util.ExpressionUtils;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.validation.constraints.NotNull;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Tool to evaluate the {@link 
org.apache.flink.table.expressions.ResolvedExpression}s.
+ */
+public class ExpressionEvaluators {
+  
+  /**
+   * Converts specific call expression list to the evaluator list.
+   */
+  public static List<Evaluator> fromExpression(List<ResolvedExpression> exprs) 
{
+    return exprs.stream()
+        .map(e -> fromExpression((CallExpression) e))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Converts specific call expression to the evaluator.
+   * <p>Two steps to bind the call:
+   * 1. map the evaluator instance;
+   * 2. bind the field reference;
+   *
+   * <p>Normalize the expression to simplify the subsequent decision logic:
+   * always put the literal expression in the RHS.
+   */
+  public static Evaluator fromExpression(CallExpression expr) {
+    FunctionDefinition funDef = expr.getFunctionDefinition();
+    List<Expression> childExprs = expr.getChildren();
+
+    boolean normalized = childExprs.get(0) instanceof FieldReferenceExpression;
+
+    if (BuiltInFunctionDefinitions.NOT.equals(funDef)) {
+      Not evaluator = Not.getInstance();
+      Evaluator childEvaluator = fromExpression((CallExpression) 
childExprs.get(0));
+      return evaluator.bindEvaluator(childEvaluator);
+    }
+
+    if (BuiltInFunctionDefinitions.AND.equals(funDef)) {
+      And evaluator = And.getInstance();
+      Evaluator evaluator1 = fromExpression((CallExpression) 
childExprs.get(0));
+      Evaluator evaluator2 = fromExpression((CallExpression) 
childExprs.get(1));
+      return evaluator.bindEvaluator(evaluator1, evaluator2);
+    }
+
+    if (BuiltInFunctionDefinitions.OR.equals(funDef)) {
+      Or evaluator = Or.getInstance();
+      Evaluator evaluator1 = fromExpression((CallExpression) 
childExprs.get(0));
+      Evaluator evaluator2 = fromExpression((CallExpression) 
childExprs.get(1));
+      return evaluator.bindEvaluator(evaluator1, evaluator2);
+    }
+
+    // handle unary operators
+    if (BuiltInFunctionDefinitions.IS_NULL.equals(funDef)) {
+      FieldReferenceExpression rExpr = (FieldReferenceExpression) 
childExprs.get(0);
+      return IsNull.getInstance()
+          .bindFieldReference(rExpr);
+    } else if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(funDef)) {
+      FieldReferenceExpression rExpr = (FieldReferenceExpression) 
childExprs.get(0);
+      return IsNotNull.getInstance()
+          .bindFieldReference(rExpr);
+    }
+
+    boolean hasNullLiteral =
+        childExprs.stream().anyMatch(e ->
+            e instanceof ValueLiteralExpression
+                && 
ExpressionUtils.getValueFromLiteral((ValueLiteralExpression) e) == null);
+    if (hasNullLiteral) {
+      return AlwaysFalse.getInstance();
+    }
+
+    // handle IN specifically
+    if (BuiltInFunctionDefinitions.IN.equals(funDef)) {
+      ValidationUtils.checkState(normalized, "The IN expression expects to be 
normalized");
+      In in = In.getInstance();
+      FieldReferenceExpression rExpr = (FieldReferenceExpression) 
childExprs.get(0);
+      in.bindFieldReference(rExpr);
+      in.bindVals(getInLiteralVals(childExprs));
+      return in;
+    }
+
+    NullFalseEvaluator evaluator;
+    // handle binary operators
+    if (BuiltInFunctionDefinitions.EQUALS.equals(funDef)) {
+      evaluator = EqualTo.getInstance();
+    } else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(funDef)) {
+      evaluator = NotEqualTo.getInstance();
+    } else if (BuiltInFunctionDefinitions.LESS_THAN.equals(funDef)) {
+      evaluator = normalized ? LessThan.getInstance() : 
GreaterThan.getInstance();
+    } else if (BuiltInFunctionDefinitions.GREATER_THAN.equals(funDef)) {
+      evaluator = normalized ? GreaterThan.getInstance() : 
LessThan.getInstance();
+    } else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(funDef)) {
+      evaluator = normalized ? LessThanOrEqual.getInstance() : 
GreaterThanOrEqual.getInstance();
+    } else if 
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(funDef)) {
+      evaluator = normalized ? GreaterThanOrEqual.getInstance() : 
LessThanOrEqual.getInstance();
+    } else {
+      throw new AssertionError("Unexpected function definition " + funDef);
+    }
+    FieldReferenceExpression rExpr = normalized
+        ? (FieldReferenceExpression) childExprs.get(0)
+        : (FieldReferenceExpression) childExprs.get(1);
+    ValueLiteralExpression vExpr = normalized
+        ? (ValueLiteralExpression) childExprs.get(1)
+        : (ValueLiteralExpression) childExprs.get(0);
+    evaluator
+        .bindVal(vExpr)
+        .bindFieldReference(rExpr);
+    return evaluator;
+  }
+
+  /**
+   * Decides whether it's possible to match based on the column values and 
column stats.
+   * The evaluator can be nested.
+   */
+  public interface Evaluator extends Serializable {
+
+    /**
+     * Decides whether it's possible to match based on the column stats.
+     *
+     * @param columnStatsMap column statistics
+     * @return
+     */
+    boolean eval(Map<String, ColumnStats> columnStatsMap);
+  }
+
+  /**
+   * Leaf evaluator which depends on the given field.
+   */
+  public abstract static class LeafEvaluator implements Evaluator {
+
+    // referenced field type
+    protected LogicalType type;
+
+    // referenced field name
+    protected String name;
+
+    // referenced field index
+    protected int index;
+
+    public LeafEvaluator bindFieldReference(FieldReferenceExpression expr) {
+      this.type = expr.getOutputDataType().getLogicalType();
+      this.name = expr.getName();
+      this.index = expr.getFieldIndex();
+      return this;
+    }
+
+    protected ColumnStats getColumnStats(Map<String, ColumnStats> 
columnStatsMap) {
+      ColumnStats columnStats = columnStatsMap.get(this.name);
+      ValidationUtils.checkState(
+          columnStats != null,
+          "Can not find column " + this.name);
+      return columnStats;
+    }
+  }
+
+  /**
+   * Leaf evaluator which compares the field value with literal values.
+   */
+  public abstract static class NullFalseEvaluator extends LeafEvaluator {
+
+    // the constant literal value
+    protected Object val;
+
+    public NullFalseEvaluator bindVal(ValueLiteralExpression vExpr) {
+      this.val = ExpressionUtils.getValueFromLiteral(vExpr);
+      return this;
+    }
+
+    @Override
+    public final boolean eval(Map<String, ColumnStats> columnStatsMap) {
+      if (this.val == null) {
+        return false;
+      } else {
+        return eval(this.val, getColumnStats(columnStatsMap), this.type);
+      }
+    }
+
+    protected abstract boolean eval(@NotNull Object val, ColumnStats 
columnStats, LogicalType type);
+  }
+
+  /**
+   * To evaluate = expr.
+   */
+  public static class EqualTo extends NullFalseEvaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static EqualTo getInstance() {
+      return new EqualTo();
+    }
+
+    @Override
+    protected boolean eval(@NotNull Object val, ColumnStats columnStats, 
LogicalType type) {
+      Object minVal = columnStats.getMinVal();
+      Object maxVal = columnStats.getMaxVal();
+      if (minVal == null || maxVal == null) {
+        return false;
+      }
+      if (compare(minVal, val, type) > 0) {
+        return false;
+      }
+      return compare(maxVal, val, type) >= 0;
+    }
+  }
+
+  /**
+   * To evaluate <> expr.
+   */
+  public static class NotEqualTo extends NullFalseEvaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static NotEqualTo getInstance() {
+      return new NotEqualTo();
+    }
+
+    @Override
+    protected boolean eval(@NotNull Object val, ColumnStats columnStats, 
LogicalType type) {
+      // because the bounds are not necessarily a min or max value, this 
cannot be answered using them.
+      // notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
+      return true;
+    }
+  }
+
+  /**
+   * To evaluate IS NULL expr.
+   */
+  public static class IsNull extends LeafEvaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static IsNull getInstance() {
+      return new IsNull();
+    }
+
+    @Override
+    public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+      ColumnStats columnStats = getColumnStats(columnStatsMap);
+      return columnStats.getNullCnt() > 0;
+    }
+  }
+
+  /**
+   * To evaluate IS NOT NULL expr.
+   */
+  public static class IsNotNull extends LeafEvaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static IsNotNull getInstance() {
+      return new IsNotNull();
+    }
+
+    @Override
+    public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+      ColumnStats columnStats = getColumnStats(columnStatsMap);
+      // should consider FLOAT/DOUBLE & NAN
+      return columnStats.getMinVal() != null || columnStats.getNullCnt() <= 0;
+    }
+  }
+
+  /**
+   * To evaluate < expr.
+   */
+  public static class LessThan extends NullFalseEvaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static LessThan getInstance() {
+      return new LessThan();
+    }
+
+    @Override
+    public boolean eval(@NotNull Object val, ColumnStats columnStats, 
LogicalType type) {
+      Object minVal = columnStats.getMinVal();
+      if (minVal == null) {
+        return false;
+      }
+      return compare(minVal, val, type) < 0;
+    }
+  }
+
+  /**
+   * To evaluate > expr.
+   */
+  public static class GreaterThan extends NullFalseEvaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static GreaterThan getInstance() {
+      return new GreaterThan();
+    }
+
+    @Override
+    protected boolean eval(@NotNull Object val, ColumnStats columnStats, 
LogicalType type) {
+      Object maxVal = columnStats.getMaxVal();
+      if (maxVal == null) {
+        return false;
+      }
+      return compare(maxVal, val, type) > 0;
+    }
+  }
+
+  /**
+   * To evaluate <= expr.
+   */
+  public static class LessThanOrEqual extends NullFalseEvaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static LessThanOrEqual getInstance() {
+      return new LessThanOrEqual();
+    }
+
+    @Override
+    protected boolean eval(@NotNull Object val, ColumnStats columnStats, 
LogicalType type) {
+      Object minVal = columnStats.getMinVal();
+      if (minVal == null) {
+        return false;
+      }
+      return compare(minVal, val, type) <= 0;
+    }
+  }
+
+  /**
+   * To evaluate >= expr.
+   */
+  public static class GreaterThanOrEqual extends NullFalseEvaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static GreaterThanOrEqual getInstance() {
+      return new GreaterThanOrEqual();
+    }
+
+    @Override
+    protected boolean eval(@NotNull Object val, ColumnStats columnStats, 
LogicalType type) {
+      Object maxVal = columnStats.getMaxVal();
+      if (maxVal == null) {
+        return false;
+      }
+      return compare(maxVal, val, type) >= 0;
+    }
+  }
+
+  /**
+   * To evaluate IN expr.
+   */
+  public static class In extends LeafEvaluator {
+    private static final long serialVersionUID = 1L;
+
+    private static final int IN_PREDICATE_LIMIT = 200;
+
+    public static In getInstance() {
+      return new In();
+    }
+
+    private Object[] vals;
+
+    @Override
+    public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+      if (Arrays.stream(vals).anyMatch(Objects::isNull)) {
+        return false;
+      }
+      ColumnStats columnStats = getColumnStats(columnStatsMap);
+      Object minVal = columnStats.getMinVal();
+      Object maxVal = columnStats.getMaxVal();
+      if (minVal == null) {
+        return false; // values are all null and literalSet cannot contain 
null.
+      }
+
+      if (vals.length > IN_PREDICATE_LIMIT) {
+        // skip evaluating the predicate if the number of values is too big
+        return true;
+      }
+
+      return Arrays.stream(vals).anyMatch(v ->
+          compare(minVal, v, this.type) <= 0 && compare(maxVal, v, this.type) 
>= 0);
+    }
+
+    public void bindVals(Object... vals) {
+      this.vals = vals;
+    }
+  }
+
+  /**
+   * A special evaluator which is not possible to match any condition.
+   */
+  public static class AlwaysFalse implements Evaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static AlwaysFalse getInstance() {
+      return new AlwaysFalse();
+    }
+
+    @Override
+    public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+      return false;
+    }
+  }
+
+  // component predicate
+
+  /**
+   * To evaluate NOT expr.
+   */
+  public static class Not implements Evaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static Not getInstance() {
+      return new Not();
+    }
+
+    private Evaluator evaluator;
+
+    @Override
+    public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+      return !this.evaluator.eval(columnStatsMap);
+    }
+
+    public Evaluator bindEvaluator(Evaluator evaluator) {
+      this.evaluator = evaluator;
+      return this;
+    }
+  }
+
+  /**
+   * To evaluate AND expr.
+   */
+  public static class And implements Evaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static And getInstance() {
+      return new And();
+    }
+
+    private Evaluator[] evaluators;
+
+    @Override
+    public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+      for (Evaluator evaluator : evaluators) {
+        if (!evaluator.eval(columnStatsMap)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    public Evaluator bindEvaluator(Evaluator... evaluators) {
+      this.evaluators = evaluators;
+      return this;
+    }
+  }
+
+  /**
+   * To evaluate OR expr.
+   */
+  public static class Or implements Evaluator {
+    private static final long serialVersionUID = 1L;
+
+    public static Or getInstance() {
+      return new Or();
+    }
+
+    private Evaluator[] evaluators;
+
+    @Override
+    public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+      for (Evaluator evaluator : evaluators) {
+        if (evaluator.eval(columnStatsMap)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public Evaluator bindEvaluator(Evaluator... evaluators) {
+      this.evaluators = evaluators;
+      return this;
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+  
+  /**
+   * Returns the IN expression literal values.
+   */
+  private static Object[] getInLiteralVals(List<Expression> childExprs) {
+    List<Object> vals = new ArrayList<>();
+    for (int i = 1; i < childExprs.size(); i++) {
+      vals.add(ExpressionUtils.getValueFromLiteral((ValueLiteralExpression) 
childExprs.get(i)));
+    }
+    return vals.toArray();
+  }
+
+  private static int compare(@NotNull Object val1, @NotNull Object val2, 
LogicalType logicalType) {
+    switch (logicalType.getTypeRoot()) {
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIME_WITHOUT_TIME_ZONE:
+      case DATE:
+        return ((Long) val1).compareTo((Long) val2);
+      case BOOLEAN:
+        return ((Boolean) val1).compareTo((Boolean) val2);
+      case TINYINT:
+      case SMALLINT:
+      case INTEGER:
+        return ((Integer) val1).compareTo((Integer) val2);
+      case FLOAT:
+        return ((Float) val1).compareTo((Float) val2);
+      case DOUBLE:
+        return ((Double) val1).compareTo((Double) val2);
+      case BINARY:
+      case VARBINARY:
+        return compareBytes((byte[]) val1, (byte[]) val2);
+      case CHAR:
+      case VARCHAR:
+        return ((String) val1).compareTo((String) val2);
+      case DECIMAL:
+        return ((BigDecimal) val1).compareTo((BigDecimal) val2);
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + 
logicalType);
+    }
+  }
+
+  private static int compareBytes(byte[] v1, byte[] v2) {
+    int len1 = v1.length;
+    int len2 = v2.length;
+    int lim = Math.min(len1, len2);
+
+    int k = 0;
+    while (k < lim) {
+      byte c1 = v1[k];
+      byte c2 = v2[k];
+      if (c1 != c2) {
+        return c1 - c2;
+      }
+      k++;
+    }
+    return len1 - len2;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index 9ee93719813..daedaf8fa3c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -25,9 +25,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.source.stats.ColumnStatsIndices;
-import org.apache.hudi.source.stats.ExpressionEvaluator;
 import org.apache.hudi.util.DataTypeUtils;
-import org.apache.hudi.util.ExpressionUtils;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
@@ -67,6 +65,7 @@ public class FileIndex {
   private List<String> partitionPaths;      // cache of partition paths
   private List<ResolvedExpression> filters; // push down filters
   private final boolean tableExists;
+  private DataPruner dataPruner;
 
   private FileIndex(Path path, Configuration conf, RowType rowType) {
     this.path = path;
@@ -145,9 +144,16 @@ public class FileIndex {
       // no need to filter by col stats or error occurs.
       return allFileStatus;
     }
-    return Arrays.stream(allFileStatus).parallel()
+    FileStatus[] results = Arrays.stream(allFileStatus).parallel()
         .filter(fileStatus -> 
candidateFiles.contains(fileStatus.getPath().getName()))
         .toArray(FileStatus[]::new);
+    double totalFileSize = allFileStatus.length;
+    double resultFileSize = results.length;
+    double skippingPercent = totalFileSize != 0 ? (totalFileSize - 
resultFileSize) / totalFileSize : 0;
+    LOG.info("Total files: " + totalFileSize
+        + "; candidate files after data skipping: " + resultFileSize
+        + "; skipping percent " + skippingPercent);
+    return results;
   }
 
   /**
@@ -191,6 +197,7 @@ public class FileIndex {
   public void setFilters(List<ResolvedExpression> filters) {
     if (filters.size() > 0) {
       this.filters = new ArrayList<>(filters);
+      this.dataPruner = initializeDataPruner(filters);
     }
   }
 
@@ -213,24 +220,11 @@ public class FileIndex {
    */
   @Nullable
   private Set<String> candidateFilesInMetadataTable(FileStatus[] 
allFileStatus) {
-    // NOTE: Data Skipping is only effective when it references columns that 
are indexed w/in
-    //       the Column Stats Index (CSI). Following cases could not be 
effectively handled by Data Skipping:
-    //          - Expressions on top-level column's fields (ie, for ex filters 
like "struct.field > 0", since
-    //          CSI only contains stats for top-level columns, in this case 
for "struct")
-    //          - Any expression not directly referencing top-level column 
(for ex, sub-queries, since there's
-    //          nothing CSI in particular could be applied for)
-    if (!metadataConfig.enabled() || !dataSkippingEnabled) {
-      validateConfig();
-      return null;
-    }
-    if (this.filters == null || this.filters.size() == 0) {
-      return null;
-    }
-    String[] referencedCols = ExpressionUtils.referencedColumns(filters);
-    if (referencedCols.length == 0) {
+    if (dataPruner == null) {
       return null;
     }
     try {
+      String[] referencedCols = dataPruner.getReferencedCols();
       final List<RowData> colStats = 
ColumnStatsIndices.readColumnStatsIndex(path.toString(), metadataConfig, 
referencedCols);
       final Pair<List<RowData>, String[]> colStatsTable = 
ColumnStatsIndices.transposeColumnStatsIndex(colStats, referencedCols, rowType);
       List<RowData> transposedColStats = colStatsTable.getLeft();
@@ -245,7 +239,7 @@ public class FileIndex {
           .map(row -> row.getString(0).toString())
           .collect(Collectors.toSet());
       Set<String> candidateFileNames = transposedColStats.stream().parallel()
-          .filter(row -> ExpressionEvaluator.filterExprs(filters, row, 
queryFields))
+          .filter(row -> dataPruner.test(row, queryFields))
           .map(row -> row.getString(0).toString())
           .collect(Collectors.toSet());
 
@@ -303,4 +297,18 @@ public class FileIndex {
   public List<ResolvedExpression> getFilters() {
     return filters;
   }
+
+  private DataPruner initializeDataPruner(List<ResolvedExpression> filters) {
+    // NOTE: Data Skipping is only effective when it references columns that 
are indexed w/in
+    //       the Column Stats Index (CSI). Following cases could not be 
effectively handled by Data Skipping:
+    //          - Expressions on top-level column's fields (ie, for ex filters 
like "struct.field > 0", since
+    //          CSI only contains stats for top-level columns, in this case 
for "struct")
+    //          - Any expression not directly referencing top-level column 
(for ex, sub-queries, since there's
+    //          nothing CSI in particular could be applied for)
+    if (!metadataConfig.enabled() || !dataSkippingEnabled) {
+      validateConfig();
+      return null;
+    }
+    return DataPruner.newInstance(filters);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStats.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStats.java
new file mode 100644
index 00000000000..7ec5371ba12
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStats.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * Column statistics.
+ */
+public class ColumnStats {
+
+  @Nullable private final Object minVal;
+  @Nullable private final Object maxVal;
+  private final long nullCnt;
+
+  public ColumnStats(@Nullable Object minVal, @Nullable Object maxVal, long 
nullCnt) {
+    this.minVal = minVal;
+    this.maxVal = maxVal;
+    this.nullCnt = nullCnt;
+  }
+
+  @Nullable
+  public Object getMinVal() {
+    return minVal;
+  }
+
+  @Nullable
+  public Object getMaxVal() {
+    return maxVal;
+  }
+
+  public long getNullCnt() {
+    return nullCnt;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ColumnStats that = (ColumnStats) o;
+    return nullCnt == that.nullCnt
+        && Objects.equals(minVal, that.minVal)
+        && Objects.equals(maxVal, that.maxVal);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(minVal, maxVal, nullCnt);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
deleted file mode 100644
index d1bb01126a0..00000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
+++ /dev/null
@@ -1,605 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.stats;
-
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.util.ExpressionUtils;
-
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.CallExpression;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ValueLiteralExpression;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
-
-import javax.validation.constraints.NotNull;
-
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Tool to evaluate the {@link 
org.apache.flink.table.expressions.ResolvedExpression}s.
- */
-public class ExpressionEvaluator {
-  private static final int IN_PREDICATE_LIMIT = 200;
-
-  /**
-   * Filter the index row with specific data filters and query fields.
-   *
-   * @param filters     The pushed down data filters
-   * @param indexRow    The index row
-   * @param queryFields The query fields referenced by the filters
-   * @return true if the index row should be considered as a candidate
-   */
-  public static boolean filterExprs(List<ResolvedExpression> filters, RowData 
indexRow, RowType.RowField[] queryFields) {
-    for (ResolvedExpression filter : filters) {
-      if (!Evaluator.bindCall((CallExpression) filter, indexRow, 
queryFields).eval()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Used for deciding whether the literal values match the column stats.
-   * The evaluator can be nested.
-   */
-  public abstract static class Evaluator {
-    // the constant literal value
-    protected Object val;
-
-    // column stats
-    protected Object minVal;
-    protected Object maxVal;
-    protected long nullCnt = 0;
-
-    // referenced field type
-    protected LogicalType type;
-
-    /**
-     * Binds the evaluator with specific call expression.
-     *
-     * <p>Three steps to bind the call:
-     * 1. map the evaluator instance;
-     * 2. bind the field reference;
-     * 3. bind the column stats.
-     *
-     * <p>Normalize the expression to simplify the subsequent decision logic:
-     * always put the literal expression in the RHS.
-     */
-    public static Evaluator bindCall(CallExpression call, RowData indexRow, 
RowType.RowField[] queryFields) {
-      FunctionDefinition funDef = call.getFunctionDefinition();
-      List<Expression> childExprs = call.getChildren();
-
-      boolean normalized = childExprs.get(0) instanceof 
FieldReferenceExpression;
-      final Evaluator evaluator;
-
-      if (BuiltInFunctionDefinitions.NOT.equals(funDef)) {
-        evaluator = Not.getInstance();
-        Evaluator childEvaluator = bindCall((CallExpression) 
childExprs.get(0), indexRow, queryFields);
-        return ((Not) evaluator).bindEvaluator(childEvaluator);
-      }
-
-      if (BuiltInFunctionDefinitions.AND.equals(funDef)) {
-        evaluator = And.getInstance();
-        Evaluator evaluator1 = bindCall((CallExpression) childExprs.get(0), 
indexRow, queryFields);
-        Evaluator evaluator2 = bindCall((CallExpression) childExprs.get(1), 
indexRow, queryFields);
-        return ((And) evaluator).bindEvaluator(evaluator1, evaluator2);
-      }
-
-      if (BuiltInFunctionDefinitions.OR.equals(funDef)) {
-        evaluator = Or.getInstance();
-        Evaluator evaluator1 = bindCall((CallExpression) childExprs.get(0), 
indexRow, queryFields);
-        Evaluator evaluator2 = bindCall((CallExpression) childExprs.get(1), 
indexRow, queryFields);
-        return ((Or) evaluator).bindEvaluator(evaluator1, evaluator2);
-      }
-
-      // handle unary operators
-      if (BuiltInFunctionDefinitions.IS_NULL.equals(funDef)) {
-        FieldReferenceExpression rExpr = (FieldReferenceExpression) 
childExprs.get(0);
-        return IsNull.getInstance()
-            .bindFieldReference(rExpr)
-            .bindColStats(indexRow, queryFields, rExpr);
-      } else if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(funDef)) {
-        FieldReferenceExpression rExpr = (FieldReferenceExpression) 
childExprs.get(0);
-        return IsNotNull.getInstance()
-            .bindFieldReference(rExpr)
-            .bindColStats(indexRow, queryFields, rExpr);
-      }
-
-      boolean hasNullLiteral =
-          childExprs.stream().anyMatch(e ->
-             e instanceof ValueLiteralExpression
-                 && 
ExpressionUtils.getValueFromLiteral((ValueLiteralExpression) e) == null);
-      if (hasNullLiteral) {
-        evaluator = AlwaysFalse.getInstance();
-        return evaluator;
-      }
-
-      // handle IN specifically
-      if (BuiltInFunctionDefinitions.IN.equals(funDef)) {
-        ValidationUtils.checkState(normalized, "The IN expression expects to 
be normalized");
-        evaluator = In.getInstance();
-        FieldReferenceExpression rExpr = (FieldReferenceExpression) 
childExprs.get(0);
-        evaluator.bindFieldReference(rExpr);
-        ((In) evaluator).bindVals(getInLiteralVals(childExprs));
-        return evaluator.bindColStats(indexRow, queryFields, rExpr);
-      }
-
-      // handle binary operators
-      if (BuiltInFunctionDefinitions.EQUALS.equals(funDef)) {
-        evaluator = EqualTo.getInstance();
-      } else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(funDef)) {
-        evaluator = NotEqualTo.getInstance();
-      } else if (BuiltInFunctionDefinitions.LESS_THAN.equals(funDef)) {
-        evaluator = normalized ? LessThan.getInstance() : 
GreaterThan.getInstance();
-      } else if (BuiltInFunctionDefinitions.GREATER_THAN.equals(funDef)) {
-        evaluator = normalized ? GreaterThan.getInstance() : 
LessThan.getInstance();
-      } else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(funDef)) 
{
-        evaluator = normalized ? LessThanOrEqual.getInstance() : 
GreaterThanOrEqual.getInstance();
-      } else if 
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(funDef)) {
-        evaluator = normalized ? GreaterThanOrEqual.getInstance() : 
LessThanOrEqual.getInstance();
-      } else {
-        throw new AssertionError("Unexpected function definition " + funDef);
-      }
-      FieldReferenceExpression rExpr = normalized
-          ? (FieldReferenceExpression) childExprs.get(0)
-          : (FieldReferenceExpression) childExprs.get(1);
-      ValueLiteralExpression vExpr = normalized
-          ? (ValueLiteralExpression) childExprs.get(1)
-          : (ValueLiteralExpression) childExprs.get(0);
-      evaluator
-          .bindFieldReference(rExpr)
-          .bindVal(vExpr)
-          .bindColStats(indexRow, queryFields, rExpr);
-      return evaluator;
-    }
-
-    public Evaluator bindColStats(
-        RowData indexRow,
-        RowType.RowField[] queryFields,
-        FieldReferenceExpression expr) {
-      int colPos = -1;
-      for (int i = 0; i < queryFields.length; i++) {
-        if (expr.getName().equals(queryFields[i].getName())) {
-          colPos = i;
-        }
-      }
-      ValidationUtils.checkState(colPos != -1, "Can not find column " + 
expr.getName());
-      int startPos = 2 + colPos * 3;
-      LogicalType colType = queryFields[colPos].getType();
-      Object minVal = indexRow.isNullAt(startPos) ? null : 
getValAsJavaObj(indexRow, startPos, colType);
-      Object maxVal = indexRow.isNullAt(startPos + 1) ? null : 
getValAsJavaObj(indexRow, startPos + 1, colType);
-      long nullCnt = indexRow.getLong(startPos + 2);
-
-      this.minVal = minVal;
-      this.maxVal = maxVal;
-      this.nullCnt = nullCnt;
-      return this;
-    }
-
-    public Evaluator bindVal(ValueLiteralExpression vExpr) {
-      this.val = ExpressionUtils.getValueFromLiteral(vExpr);
-      return this;
-    }
-
-    public Evaluator bindFieldReference(FieldReferenceExpression expr) {
-      this.type = expr.getOutputDataType().getLogicalType();
-      return this;
-    }
-
-    public abstract boolean eval();
-  }
-
-  public abstract static class NullFalseEvaluator extends Evaluator {
-
-    @Override
-    public final boolean eval() {
-      if (this.val == null) {
-        return false;
-      } else {
-        return eval(this.val);
-      }
-    }
-
-    protected abstract boolean eval(@NotNull Object val);
-  }
-
-  /**
-   * To evaluate = expr.
-   */
-  public static class EqualTo extends NullFalseEvaluator {
-
-    public static EqualTo getInstance() {
-      return new EqualTo();
-    }
-
-    @Override
-    protected boolean eval(@NotNull Object val) {
-      if (this.minVal == null || this.maxVal == null) {
-        return false;
-      }
-      if (compare(this.minVal, val, this.type) > 0) {
-        return false;
-      }
-      return compare(this.maxVal, val, this.type) >= 0;
-    }
-  }
-
-  /**
-   * To evaluate <> expr.
-   */
-  public static class NotEqualTo extends NullFalseEvaluator {
-    public static NotEqualTo getInstance() {
-      return new NotEqualTo();
-    }
-
-    @Override
-    protected boolean eval(@NotNull Object val) {
-      // because the bounds are not necessarily a min or max value, this 
cannot be answered using
-      // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value 
in col.
-      return true;
-    }
-  }
-
-  /**
-   * To evaluate IS NULL expr.
-   */
-  public static class IsNull extends Evaluator {
-    public static IsNull getInstance() {
-      return new IsNull();
-    }
-
-    @Override
-    public boolean eval() {
-      return this.nullCnt > 0;
-    }
-  }
-
-  /**
-   * To evaluate IS NOT NULL expr.
-   */
-  public static class IsNotNull extends Evaluator {
-    public static IsNotNull getInstance() {
-      return new IsNotNull();
-    }
-
-    @Override
-    public boolean eval() {
-      // should consider FLOAT/DOUBLE & NAN
-      return this.minVal != null || this.nullCnt <= 0;
-    }
-  }
-
-  /**
-   * To evaluate < expr.
-   */
-  public static class LessThan extends NullFalseEvaluator {
-    public static LessThan getInstance() {
-      return new LessThan();
-    }
-
-    @Override
-    public boolean eval(@NotNull Object val) {
-      if (this.minVal == null) {
-        return false;
-      }
-      return compare(this.minVal, val, this.type) < 0;
-    }
-  }
-
-  /**
-   * To evaluate > expr.
-   */
-  public static class GreaterThan extends NullFalseEvaluator {
-    public static GreaterThan getInstance() {
-      return new GreaterThan();
-    }
-
-    @Override
-    protected boolean eval(@NotNull Object val) {
-      if (this.maxVal == null) {
-        return false;
-      }
-      return compare(this.maxVal, val, this.type) > 0;
-    }
-  }
-
-  /**
-   * To evaluate <= expr.
-   */
-  public static class LessThanOrEqual extends NullFalseEvaluator {
-    public static LessThanOrEqual getInstance() {
-      return new LessThanOrEqual();
-    }
-
-    @Override
-    protected boolean eval(@NotNull Object val) {
-      if (this.minVal == null) {
-        return false;
-      }
-      return compare(this.minVal, val, this.type) <= 0;
-    }
-  }
-
-  /**
-   * To evaluate >= expr.
-   */
-  public static class GreaterThanOrEqual extends NullFalseEvaluator {
-    public static GreaterThanOrEqual getInstance() {
-      return new GreaterThanOrEqual();
-    }
-
-    @Override
-    protected boolean eval(@NotNull Object val) {
-      if (this.maxVal == null) {
-        return false;
-      }
-      return compare(this.maxVal, val, this.type) >= 0;
-    }
-  }
-
-  /**
-   * To evaluate IN expr.
-   */
-  public static class In extends Evaluator {
-    public static In getInstance() {
-      return new In();
-    }
-
-    private Object[] vals;
-
-    @Override
-    public boolean eval() {
-      if (Arrays.stream(vals).anyMatch(Objects::isNull)) {
-        return false;
-      }
-      if (this.minVal == null) {
-        return false; // values are all null and literalSet cannot contain 
null.
-      }
-
-      if (vals.length > IN_PREDICATE_LIMIT) {
-        // skip evaluating the predicate if the number of values is too big
-        return true;
-      }
-
-      vals = Arrays.stream(vals).filter(v -> compare(this.minVal, v, 
this.type) <= 0).toArray();
-      if (vals.length == 0) { // if all values are less than lower bound, rows 
cannot match.
-        return false;
-      }
-
-      vals = Arrays.stream(vals).filter(v -> compare(this.maxVal, v, 
this.type) >= 0).toArray();
-      if (vals.length == 0) { // if all remaining values are greater than 
upper bound, rows cannot match.
-        return false;
-      }
-
-      return true;
-    }
-
-    public void bindVals(Object... vals) {
-      this.vals = vals;
-    }
-  }
-
-  // A special evaluator which is not possible to match any condition
-  public static class AlwaysFalse extends Evaluator {
-
-    public static AlwaysFalse getInstance() {
-      return new AlwaysFalse();
-    }
-
-    @Override
-    public Evaluator bindColStats(
-        RowData indexRow,
-        RowType.RowField[] queryFields,
-        FieldReferenceExpression expr) {
-      // this no need to do anything
-      return this;
-    }
-
-    @Override
-    public boolean eval() {
-      return false;
-    }
-  }
-
-  // component predicate
-
-  /**
-   * To evaluate NOT expr.
-   */
-  public static class Not extends Evaluator {
-    public static Not getInstance() {
-      return new Not();
-    }
-
-    private Evaluator evaluator;
-
-    @Override
-    public boolean eval() {
-      return !this.evaluator.eval();
-    }
-
-    public Evaluator bindEvaluator(Evaluator evaluator) {
-      this.evaluator = evaluator;
-      return this;
-    }
-  }
-
-  /**
-   * To evaluate AND expr.
-   */
-  public static class And extends Evaluator {
-    public static And getInstance() {
-      return new And();
-    }
-
-    private Evaluator[] evaluators;
-
-    @Override
-    public boolean eval() {
-      for (Evaluator evaluator : evaluators) {
-        if (!evaluator.eval()) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    public Evaluator bindEvaluator(Evaluator... evaluators) {
-      this.evaluators = evaluators;
-      return this;
-    }
-  }
-
-  /**
-   * To evaluate OR expr.
-   */
-  public static class Or extends Evaluator {
-    public static Or getInstance() {
-      return new Or();
-    }
-
-    private Evaluator[] evaluators;
-
-    @Override
-    public boolean eval() {
-      for (Evaluator evaluator : evaluators) {
-        if (evaluator.eval()) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    public Evaluator bindEvaluator(Evaluator... evaluators) {
-      this.evaluators = evaluators;
-      return this;
-    }
-  }
-
-  // -------------------------------------------------------------------------
-  //  Utilities
-  // -------------------------------------------------------------------------
-
-  private static int compare(@NotNull Object val1, @NotNull Object val2, 
LogicalType logicalType) {
-    switch (logicalType.getTypeRoot()) {
-      case TIMESTAMP_WITHOUT_TIME_ZONE:
-      case TIME_WITHOUT_TIME_ZONE:
-      case DATE:
-        return ((Long) val1).compareTo((Long) val2);
-      case BOOLEAN:
-        return ((Boolean) val1).compareTo((Boolean) val2);
-      case TINYINT:
-      case SMALLINT:
-      case INTEGER:
-        return ((Integer) val1).compareTo((Integer) val2);
-      case FLOAT:
-        return ((Float) val1).compareTo((Float) val2);
-      case DOUBLE:
-        return ((Double) val1).compareTo((Double) val2);
-      case BINARY:
-      case VARBINARY:
-        return compareBytes((byte[]) val1, (byte[]) val2);
-      case CHAR:
-      case VARCHAR:
-        return ((String) val1).compareTo((String) val2);
-      case DECIMAL:
-        return ((BigDecimal) val1).compareTo((BigDecimal) val2);
-      default:
-        throw new UnsupportedOperationException("Unsupported type: " + 
logicalType);
-    }
-  }
-
-  private static int compareBytes(byte[] v1, byte[] v2) {
-    int len1 = v1.length;
-    int len2 = v2.length;
-    int lim = Math.min(len1, len2);
-
-    int k = 0;
-    while (k < lim) {
-      byte c1 = v1[k];
-      byte c2 = v2[k];
-      if (c1 != c2) {
-        return c1 - c2;
-      }
-      k++;
-    }
-    return len1 - len2;
-  }
-
-  /**
-   * Returns the IN expression literal values.
-   */
-  private static Object[] getInLiteralVals(List<Expression> childExprs) {
-    List<Object> vals = new ArrayList<>();
-    for (int i = 1; i < childExprs.size(); i++) {
-      vals.add(ExpressionUtils.getValueFromLiteral((ValueLiteralExpression) 
childExprs.get(i)));
-    }
-    return vals.toArray();
-  }
-
-  /**
-   * Returns the value as Java object at position {@code pos} of row {@code 
indexRow}.
-   */
-  private static Object getValAsJavaObj(RowData indexRow, int pos, LogicalType 
colType) {
-    switch (colType.getTypeRoot()) {
-      // NOTE: Since we can't rely on Avro's "date", and "timestamp-micros" 
logical-types, we're
-      //       manually encoding corresponding values as int and long w/in the 
Column Stats Index and
-      //       here we have to decode those back into corresponding logical 
representation.
-      case TIMESTAMP_WITHOUT_TIME_ZONE:
-        TimestampType tsType = (TimestampType) colType;
-        return indexRow.getTimestamp(pos, 
tsType.getPrecision()).getMillisecond();
-      case TIME_WITHOUT_TIME_ZONE:
-      case DATE:
-        return indexRow.getLong(pos);
-      // NOTE: All integral types of size less than Int are encoded as Ints in 
MT
-      case BOOLEAN:
-        return indexRow.getBoolean(pos);
-      case TINYINT:
-      case SMALLINT:
-      case INTEGER:
-        return indexRow.getInt(pos);
-      case FLOAT:
-        return indexRow.getFloat(pos);
-      case DOUBLE:
-        return indexRow.getDouble(pos);
-      case BINARY:
-      case VARBINARY:
-        return indexRow.getBinary(pos);
-      case CHAR:
-      case VARCHAR:
-        return indexRow.getString(pos).toString();
-      case DECIMAL:
-        DecimalType decimalType = (DecimalType) colType;
-        return indexRow.getDecimal(pos, decimalType.getPrecision(), 
decimalType.getScale()).toBigDecimal();
-      default:
-        throw new UnsupportedOperationException("Unsupported type: " + 
colType);
-    }
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
new file mode 100644
index 00000000000..eef8e6df48c
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.source.DataPruner.convertColumnStats;
+import static org.apache.hudi.source.ExpressionEvaluators.fromExpression;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link 
org.apache.hudi.source.ExpressionEvaluators.Evaluator}.
+ */
+public class TestExpressionEvaluators {
+  private static final DataType ROW_DATA_TYPE = DataTypes.ROW(
+      DataTypes.FIELD("f_tinyint", DataTypes.TINYINT()),
+      DataTypes.FIELD("f_smallint", DataTypes.SMALLINT()),
+      DataTypes.FIELD("f_int", DataTypes.INT()),
+      DataTypes.FIELD("f_long", DataTypes.BIGINT()),
+      DataTypes.FIELD("f_float", DataTypes.FLOAT()),
+      DataTypes.FIELD("f_double", DataTypes.DOUBLE()),
+      DataTypes.FIELD("f_boolean", DataTypes.BOOLEAN()),
+      DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(10, 2)),
+      DataTypes.FIELD("f_bytes", DataTypes.VARBINARY(10)),
+      DataTypes.FIELD("f_string", DataTypes.VARCHAR(10)),
+      DataTypes.FIELD("f_time", DataTypes.TIME(3)),
+      DataTypes.FIELD("f_date", DataTypes.DATE()),
+      DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3))
+  ).notNull();
+  private static final DataType INDEX_ROW_DATA_TYPE = DataTypes.ROW(
+      DataTypes.FIELD("file_name", DataTypes.STRING()),
+      DataTypes.FIELD("value_cnt", DataTypes.BIGINT()),
+      DataTypes.FIELD("f_int_min", DataTypes.INT()),
+      DataTypes.FIELD("f_int_max", DataTypes.INT()),
+      DataTypes.FIELD("f_int_null_cnt", DataTypes.BIGINT()),
+      DataTypes.FIELD("f_string_min", DataTypes.VARCHAR(10)),
+      DataTypes.FIELD("f_string_max", DataTypes.VARCHAR(10)),
+      DataTypes.FIELD("f_string_null_cnt", DataTypes.BIGINT()),
+      DataTypes.FIELD("f_timestamp_min", DataTypes.TIMESTAMP(3)),
+      DataTypes.FIELD("f_timestamp_max", DataTypes.TIMESTAMP(3)),
+      DataTypes.FIELD("f_timestamp_null_cnt", DataTypes.BIGINT())
+  ).notNull();
+
+  private static final RowType INDEX_ROW_TYPE = (RowType) 
INDEX_ROW_DATA_TYPE.getLogicalType();
+
+  @Test
+  void testEqualTo() {
+    ExpressionEvaluators.EqualTo equalTo = 
ExpressionEvaluators.EqualTo.getInstance();
+    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
+    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+    equalTo.bindVal(vExpr)
+        .bindFieldReference(rExpr);
+    RowData indexRow1 = intIndexRow(11, 13);
+    Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1, 
queryFields(2));
+    assertTrue(equalTo.eval(stats1), "11 < 12 < 13");
+
+    RowData indexRow2 = intIndexRow(12, 13);
+    Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2, 
queryFields(2));
+    assertTrue(equalTo.eval(stats2), "12 <= 12 < 13");
+
+    RowData indexRow3 = intIndexRow(11, 12);
+    Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3, 
queryFields(2));
+    assertTrue(equalTo.eval(stats3), "11 < 12 <= 12");
+
+    RowData indexRow4 = intIndexRow(10, 11);
+    Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4, 
queryFields(2));
+    assertFalse(equalTo.eval(stats4), "11 < 12");
+
+    RowData indexRow5 = intIndexRow(13, 14);
+    Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5, 
queryFields(2));
+    assertFalse(equalTo.eval(stats5), "12 < 13");
+
+    RowData indexRow6 = intIndexRow(null, null);
+    Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6, 
queryFields(2));
+    assertFalse(equalTo.eval(stats6), "12 <> null");
+
+    equalTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
+    assertFalse(equalTo.eval(stats1), "It is not possible to test for NULL 
values with '=' operator");
+  }
+
+  @Test
+  void testNotEqualTo() {
+    ExpressionEvaluators.NotEqualTo notEqualTo = 
ExpressionEvaluators.NotEqualTo.getInstance();
+    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
+    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+    RowData indexRow1 = intIndexRow(11, 13);
+    notEqualTo.bindVal(vExpr)
+        .bindFieldReference(rExpr);
+    Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1, 
queryFields(2));
+    assertTrue(notEqualTo.eval(stats1), "11 <> 12 && 12 <> 13");
+
+    RowData indexRow2 = intIndexRow(12, 13);
+    Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2, 
queryFields(2));
+    assertTrue(notEqualTo.eval(stats2), "12 <> 13");
+
+    RowData indexRow3 = intIndexRow(11, 12);
+    Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3, 
queryFields(2));
+    assertTrue(notEqualTo.eval(stats3), "11 <> 12");
+
+    RowData indexRow4 = intIndexRow(10, 11);
+    Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4, 
queryFields(2));
+    assertTrue(notEqualTo.eval(stats4), "10 <> 12 and 11 < 12");
+
+    RowData indexRow5 = intIndexRow(13, 14);
+    Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5, 
queryFields(2));
+    assertTrue(notEqualTo.eval(stats5), "12 <> 13 and 12 <> 14");
+
+    RowData indexRow6 = intIndexRow(null, null);
+    Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6, 
queryFields(2));
+    assertTrue(notEqualTo.eval(stats6), "12 <> null");
+
+    notEqualTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
+    assertFalse(notEqualTo.eval(stats1), "It is not possible to test for NULL 
values with '<>' operator");
+  }
+
+  @Test
+  void testIsNull() {
+    ExpressionEvaluators.IsNull isNull = 
ExpressionEvaluators.IsNull.getInstance();
+    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
+
+    RowData indexRow1 = intIndexRow(11, 13);
+    isNull.bindFieldReference(rExpr);
+    Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1, 
queryFields(2));
+    assertTrue(isNull.eval(stats1), "2 nulls");
+
+    RowData indexRow2 = intIndexRow(12, 13, 0L);
+    Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2, 
queryFields(2));
+    assertFalse(isNull.eval(stats2), "0 nulls");
+  }
+
+  @Test
+  void testIsNotNull() {
+    ExpressionEvaluators.IsNotNull isNotNull = 
ExpressionEvaluators.IsNotNull.getInstance();
+    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
+
+    RowData indexRow1 = intIndexRow(11, 13);
+    isNotNull.bindFieldReference(rExpr);
+    Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1, 
queryFields(2));
+    assertTrue(isNotNull.eval(stats1), "min 11 is not null");
+
+    RowData indexRow2 = intIndexRow(null, null, 0L);
+    Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2, 
queryFields(2));
+    assertTrue(isNotNull.eval(stats2), "min is null and 0 nulls");
+  }
+
+  @Test
+  void testLessThan() {
+    ExpressionEvaluators.LessThan lessThan = 
ExpressionEvaluators.LessThan.getInstance();
+    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
+    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+    RowData indexRow1 = intIndexRow(11, 13);
+    lessThan.bindVal(vExpr)
+        .bindFieldReference(rExpr);
+    Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1, 
queryFields(2));
+    assertTrue(lessThan.eval(stats1), "12 < 13");
+
+    RowData indexRow2 = intIndexRow(12, 13);
+    Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2, 
queryFields(2));
+    assertFalse(lessThan.eval(stats2), "min 12 = 12");
+
+    RowData indexRow3 = intIndexRow(11, 12);
+    Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3, 
queryFields(2));
+    assertTrue(lessThan.eval(stats3), "11 < 12");
+
+    RowData indexRow4 = intIndexRow(10, 11);
+    Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4, 
queryFields(2));
+    assertTrue(lessThan.eval(stats4), "11 < 12");
+
+    RowData indexRow5 = intIndexRow(13, 14);
+    Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5, 
queryFields(2));
+    assertFalse(lessThan.eval(stats5), "12 < min 13");
+
+    RowData indexRow6 = intIndexRow(null, null);
+    Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6, 
queryFields(2));
+    assertFalse(lessThan.eval(stats6), "12 <> null");
+
+    lessThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
+    assertFalse(lessThan.eval(stats1), "It is not possible to test for NULL 
values with '<' operator");
+  }
+
+  @Test
+  void testGreaterThan() {
+    ExpressionEvaluators.GreaterThan greaterThan = 
ExpressionEvaluators.GreaterThan.getInstance();
+    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
+    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+    RowData indexRow1 = intIndexRow(11, 13);
+    greaterThan.bindVal(vExpr)
+        .bindFieldReference(rExpr);
+    Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1, 
queryFields(2));
+    assertTrue(greaterThan.eval(stats1), "12 < 13");
+
+    RowData indexRow2 = intIndexRow(12, 13);
+    Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2, 
queryFields(2));
+    assertTrue(greaterThan.eval(stats2), "12 < 13");
+
+    RowData indexRow3 = intIndexRow(11, 12);
+    Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3, 
queryFields(2));
+    assertFalse(greaterThan.eval(stats3), "max 12 = 12");
+
+    RowData indexRow4 = intIndexRow(10, 11);
+    Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4, 
queryFields(2));
+    assertFalse(greaterThan.eval(stats4), "max 11 < 12");
+
+    RowData indexRow5 = intIndexRow(13, 14);
+    Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5, 
queryFields(2));
+    assertTrue(greaterThan.eval(stats5), "12 < 13");
+
+    RowData indexRow6 = intIndexRow(null, null);
+    Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6, 
queryFields(2));
+    assertFalse(greaterThan.eval(stats6), "12 <> null");
+
+    greaterThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
+    assertFalse(greaterThan.eval(stats1), "It is not possible to test for NULL 
values with '>' operator");
+  }
+
+  @Test
+  void testLessThanOrEqual() {
+    ExpressionEvaluators.LessThanOrEqual lessThanOrEqual = 
ExpressionEvaluators.LessThanOrEqual.getInstance();
+    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
+    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+    RowData indexRow1 = intIndexRow(11, 13);
+    lessThanOrEqual.bindVal(vExpr)
+        .bindFieldReference(rExpr);
+    Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1, 
queryFields(2));
+    assertTrue(lessThanOrEqual.eval(stats1), "11 < 12");
+
+    RowData indexRow2 = intIndexRow(12, 13);
+    Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2, 
queryFields(2));
+    assertTrue(lessThanOrEqual.eval(stats2), "min 12 = 12");
+
+    RowData indexRow3 = intIndexRow(11, 12);
+    Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3, 
queryFields(2));
+    assertTrue(lessThanOrEqual.eval(stats3), "max 12 = 12");
+
+    RowData indexRow4 = intIndexRow(10, 11);
+    Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4, 
queryFields(2));
+    assertTrue(lessThanOrEqual.eval(stats4), "max 11 < 12");
+
+    RowData indexRow5 = intIndexRow(13, 14);
+    Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5, 
queryFields(2));
+    assertFalse(lessThanOrEqual.eval(stats5), "12 < 13");
+
+    RowData indexRow6 = intIndexRow(null, null);
+    Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6, 
queryFields(2));
+    assertFalse(lessThanOrEqual.eval(stats6), "12 <> null");
+
+    lessThanOrEqual.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
+    assertFalse(lessThanOrEqual.eval(stats1), "It is not possible to test for 
NULL values with '<=' operator");
+  }
+
+  @Test
+  void testGreaterThanOrEqual() {
+    ExpressionEvaluators.GreaterThanOrEqual greaterThanOrEqual = 
ExpressionEvaluators.GreaterThanOrEqual.getInstance();
+    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
+    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+    RowData indexRow1 = intIndexRow(11, 13);
+    greaterThanOrEqual.bindVal(vExpr)
+        .bindFieldReference(rExpr);
+    Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1, 
queryFields(2));
+    assertTrue(greaterThanOrEqual.eval(stats1), "12 < 13");
+
+    RowData indexRow2 = intIndexRow(12, 13);
+    Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2, 
queryFields(2));
+    assertTrue(greaterThanOrEqual.eval(stats2), "min 12 = 12");
+
+    RowData indexRow3 = intIndexRow(11, 12);
+    Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3, 
queryFields(2));
+    assertTrue(greaterThanOrEqual.eval(stats3), "max 12 = 12");
+
+    RowData indexRow4 = intIndexRow(10, 11);
+    Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4, 
queryFields(2));
+    assertFalse(greaterThanOrEqual.eval(stats4), "max 11 < 12");
+
+    RowData indexRow5 = intIndexRow(13, 14);
+    Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5, 
queryFields(2));
+    assertTrue(greaterThanOrEqual.eval(stats5), "12 < 13");
+
+    RowData indexRow6 = intIndexRow(null, null);
+    Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6, 
queryFields(2));
+    assertFalse(greaterThanOrEqual.eval(stats6), "12 <> null");
+
+    greaterThanOrEqual.bindVal(new ValueLiteralExpression(null, 
DataTypes.INT()));
+    assertFalse(greaterThanOrEqual.eval(stats1), "It is not possible to test 
for NULL values with '>=' operator");
+  }
+
+  @Test
+  void testIn() {
+    ExpressionEvaluators.In in = ExpressionEvaluators.In.getInstance();
+    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
+
+    RowData indexRow1 = intIndexRow(11, 13);
+    in.bindFieldReference(rExpr);
+    in.bindVals(11, 12);
+    Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1, 
queryFields(2));
+    assertTrue(in.eval(stats1), "11 < 12 < 13");
+
+    RowData indexRow2 = intIndexRow(12, 13);
+    Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2, 
queryFields(2));
+    assertTrue(in.eval(stats2), "min 12 = 12");
+
+    RowData indexRow3 = intIndexRow(11, 12);
+    Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3, 
queryFields(2));
+    assertTrue(in.eval(stats3), "max 12 = 12");
+
+    RowData indexRow4 = intIndexRow(10, 11);
+    Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4, 
queryFields(2));
+    assertTrue(in.eval(stats4), "max 11 = 11");
+
+    RowData indexRow5 = intIndexRow(13, 14);
+    Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5, 
queryFields(2));
+    assertFalse(in.eval(stats5), "12 < 13");
+
+    RowData indexRow6 = intIndexRow(null, null);
+    Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6, 
queryFields(2));
+    assertFalse(in.eval(stats6), "12 <> null");
+
+    in.bindVals((Object) null);
+    assertFalse(in.eval(stats1), "It is not possible to test for NULL values 
with 'in' operator");
+  }
+
+  @Test
+  void testAlwaysFalse() {
+    FieldReferenceExpression ref = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
+    ValueLiteralExpression nullLiteral = new ValueLiteralExpression(null, 
DataTypes.INT());
+    RowData indexRow = intIndexRow(11, 13);
+    Map<String, ColumnStats> stats = convertColumnStats(indexRow, 
queryFields(2));
+    FunctionDefinition[] funDefs = new FunctionDefinition[] {
+        BuiltInFunctionDefinitions.EQUALS,
+        BuiltInFunctionDefinitions.NOT_EQUALS,
+        BuiltInFunctionDefinitions.LESS_THAN,
+        BuiltInFunctionDefinitions.GREATER_THAN,
+        BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+        BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+        BuiltInFunctionDefinitions.IN};
+    for (FunctionDefinition funDef : funDefs) {
+      CallExpression expr =
+          new CallExpression(
+              funDef,
+              Arrays.asList(ref, nullLiteral),
+              DataTypes.BOOLEAN());
+      // always return false if the literal value is null
+      assertFalse(fromExpression(expr).eval(stats));
+    }
+  }
+
+  private static RowData intIndexRow(Integer minVal, Integer maxVal) {
+    return intIndexRow(minVal, maxVal, 2L);
+  }
+
+  private static RowData intIndexRow(Integer minVal, Integer maxVal, Long 
nullCnt) {
+    return indexRow(StringData.fromString("f1"), 100L,
+        minVal, maxVal, nullCnt,
+        StringData.fromString("1"), StringData.fromString("100"), 5L,
+        TimestampData.fromEpochMillis(1), TimestampData.fromEpochMillis(100), 
3L);
+  }
+
+  private static RowData indexRow(Object... fields) {
+    return TestData.insertRow(INDEX_ROW_TYPE, fields);
+  }
+
+  private static RowType.RowField[] queryFields(int... pos) {
+    List<RowType.RowField> fields = ((RowType) 
ROW_DATA_TYPE.getLogicalType()).getFields();
+    return 
Arrays.stream(pos).mapToObj(fields::get).toArray(RowType.RowField[]::new);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
deleted file mode 100644
index e1fe27c3bb5..00000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.stats;
-
-import org.apache.hudi.utils.TestData;
-
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.expressions.CallExpression;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ValueLiteralExpression;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static 
org.apache.hudi.source.stats.ExpressionEvaluator.Evaluator.bindCall;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Test cases for {@link ExpressionEvaluator}.
- */
-public class TestExpressionEvaluator {
-  private static final DataType ROW_DATA_TYPE = DataTypes.ROW(
-      DataTypes.FIELD("f_tinyint", DataTypes.TINYINT()),
-      DataTypes.FIELD("f_smallint", DataTypes.SMALLINT()),
-      DataTypes.FIELD("f_int", DataTypes.INT()),
-      DataTypes.FIELD("f_long", DataTypes.BIGINT()),
-      DataTypes.FIELD("f_float", DataTypes.FLOAT()),
-      DataTypes.FIELD("f_double", DataTypes.DOUBLE()),
-      DataTypes.FIELD("f_boolean", DataTypes.BOOLEAN()),
-      DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(10, 2)),
-      DataTypes.FIELD("f_bytes", DataTypes.VARBINARY(10)),
-      DataTypes.FIELD("f_string", DataTypes.VARCHAR(10)),
-      DataTypes.FIELD("f_time", DataTypes.TIME(3)),
-      DataTypes.FIELD("f_date", DataTypes.DATE()),
-      DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3))
-  ).notNull();
-  private static final DataType INDEX_ROW_DATA_TYPE = DataTypes.ROW(
-      DataTypes.FIELD("file_name", DataTypes.STRING()),
-      DataTypes.FIELD("value_cnt", DataTypes.BIGINT()),
-      DataTypes.FIELD("f_int_min", DataTypes.INT()),
-      DataTypes.FIELD("f_int_max", DataTypes.INT()),
-      DataTypes.FIELD("f_int_null_cnt", DataTypes.BIGINT()),
-      DataTypes.FIELD("f_string_min", DataTypes.VARCHAR(10)),
-      DataTypes.FIELD("f_string_max", DataTypes.VARCHAR(10)),
-      DataTypes.FIELD("f_string_null_cnt", DataTypes.BIGINT()),
-      DataTypes.FIELD("f_timestamp_min", DataTypes.TIMESTAMP(3)),
-      DataTypes.FIELD("f_timestamp_max", DataTypes.TIMESTAMP(3)),
-      DataTypes.FIELD("f_timestamp_null_cnt", DataTypes.BIGINT())
-  ).notNull();
-
-  private static final RowType INDEX_ROW_TYPE = (RowType) 
INDEX_ROW_DATA_TYPE.getLogicalType();
-
-  @Test
-  void testEqualTo() {
-    ExpressionEvaluator.EqualTo equalTo = 
ExpressionEvaluator.EqualTo.getInstance();
-    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
-    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
-    RowData indexRow1 = intIndexRow(11, 13);
-    equalTo.bindFieldReference(rExpr)
-        .bindVal(vExpr)
-        .bindColStats(indexRow1, queryFields(2), rExpr);
-    assertTrue(equalTo.eval(), "11 < 12 < 13");
-
-    RowData indexRow2 = intIndexRow(12, 13);
-    equalTo.bindColStats(indexRow2, queryFields(2), rExpr);
-    assertTrue(equalTo.eval(), "12 <= 12 < 13");
-
-    RowData indexRow3 = intIndexRow(11, 12);
-    equalTo.bindColStats(indexRow3, queryFields(2), rExpr);
-    assertTrue(equalTo.eval(), "11 < 12 <= 12");
-
-    RowData indexRow4 = intIndexRow(10, 11);
-    equalTo.bindColStats(indexRow4, queryFields(2), rExpr);
-    assertFalse(equalTo.eval(), "11 < 12");
-
-    RowData indexRow5 = intIndexRow(13, 14);
-    equalTo.bindColStats(indexRow5, queryFields(2), rExpr);
-    assertFalse(equalTo.eval(), "12 < 13");
-
-    RowData indexRow6 = intIndexRow(null, null);
-    equalTo.bindColStats(indexRow6, queryFields(2), rExpr);
-    assertFalse(equalTo.eval(), "12 <> null");
-
-    equalTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
-    assertFalse(equalTo.eval(), "It is not possible to test for NULL values 
with '=' operator");
-  }
-
-  @Test
-  void testNotEqualTo() {
-    ExpressionEvaluator.NotEqualTo notEqualTo = 
ExpressionEvaluator.NotEqualTo.getInstance();
-    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
-    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
-    RowData indexRow1 = intIndexRow(11, 13);
-    notEqualTo.bindFieldReference(rExpr)
-        .bindVal(vExpr)
-        .bindColStats(indexRow1, queryFields(2), rExpr);
-    assertTrue(notEqualTo.eval(), "11 <> 12 && 12 <> 13");
-
-    RowData indexRow2 = intIndexRow(12, 13);
-    notEqualTo.bindColStats(indexRow2, queryFields(2), rExpr);
-    assertTrue(notEqualTo.eval(), "12 <> 13");
-
-    RowData indexRow3 = intIndexRow(11, 12);
-    notEqualTo.bindColStats(indexRow3, queryFields(2), rExpr);
-    assertTrue(notEqualTo.eval(), "11 <> 12");
-
-    RowData indexRow4 = intIndexRow(10, 11);
-    notEqualTo.bindColStats(indexRow4, queryFields(2), rExpr);
-    assertTrue(notEqualTo.eval(), "10 <> 12 and 11 < 12");
-
-    RowData indexRow5 = intIndexRow(13, 14);
-    notEqualTo.bindColStats(indexRow5, queryFields(2), rExpr);
-    assertTrue(notEqualTo.eval(), "12 <> 13 and 12 <> 14");
-
-    RowData indexRow6 = intIndexRow(null, null);
-    notEqualTo.bindColStats(indexRow6, queryFields(2), rExpr);
-    assertTrue(notEqualTo.eval(), "12 <> null");
-
-    notEqualTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
-    assertFalse(notEqualTo.eval(), "It is not possible to test for NULL values 
with '<>' operator");
-  }
-
-  @Test
-  void testIsNull() {
-    ExpressionEvaluator.IsNull isNull = 
ExpressionEvaluator.IsNull.getInstance();
-    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
-
-    RowData indexRow1 = intIndexRow(11, 13);
-    isNull.bindFieldReference(rExpr)
-        .bindColStats(indexRow1, queryFields(2), rExpr);
-    assertTrue(isNull.eval(), "2 nulls");
-
-    RowData indexRow2 = intIndexRow(12, 13, 0L);
-    isNull.bindColStats(indexRow2, queryFields(2), rExpr);
-    assertFalse(isNull.eval(), "0 nulls");
-  }
-
-  @Test
-  void testIsNotNull() {
-    ExpressionEvaluator.IsNotNull isNotNull = 
ExpressionEvaluator.IsNotNull.getInstance();
-    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
-
-    RowData indexRow1 = intIndexRow(11, 13);
-    isNotNull.bindFieldReference(rExpr)
-        .bindColStats(indexRow1, queryFields(2), rExpr);
-    assertTrue(isNotNull.eval(), "min 11 is not null");
-
-    RowData indexRow2 = intIndexRow(null, null, 0L);
-    isNotNull.bindColStats(indexRow2, queryFields(2), rExpr);
-    assertTrue(isNotNull.eval(), "min is null and 0 nulls");
-  }
-
-  @Test
-  void testLessThan() {
-    ExpressionEvaluator.LessThan lessThan = 
ExpressionEvaluator.LessThan.getInstance();
-    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
-    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
-    RowData indexRow1 = intIndexRow(11, 13);
-    lessThan.bindFieldReference(rExpr)
-        .bindVal(vExpr)
-        .bindColStats(indexRow1, queryFields(2), rExpr);
-    assertTrue(lessThan.eval(), "12 < 13");
-
-    RowData indexRow2 = intIndexRow(12, 13);
-    lessThan.bindColStats(indexRow2, queryFields(2), rExpr);
-    assertFalse(lessThan.eval(), "min 12 = 12");
-
-    RowData indexRow3 = intIndexRow(11, 12);
-    lessThan.bindColStats(indexRow3, queryFields(2), rExpr);
-    assertTrue(lessThan.eval(), "11 < 12");
-
-    RowData indexRow4 = intIndexRow(10, 11);
-    lessThan.bindColStats(indexRow4, queryFields(2), rExpr);
-    assertTrue(lessThan.eval(), "11 < 12");
-
-    RowData indexRow5 = intIndexRow(13, 14);
-    lessThan.bindColStats(indexRow5, queryFields(2), rExpr);
-    assertFalse(lessThan.eval(), "12 < min 13");
-
-    RowData indexRow6 = intIndexRow(null, null);
-    lessThan.bindColStats(indexRow6, queryFields(2), rExpr);
-    assertFalse(lessThan.eval(), "12 <> null");
-
-    lessThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
-    assertFalse(lessThan.eval(), "It is not possible to test for NULL values 
with '<' operator");
-  }
-
-  @Test
-  void testGreaterThan() {
-    ExpressionEvaluator.GreaterThan greaterThan = 
ExpressionEvaluator.GreaterThan.getInstance();
-    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
-    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
-    RowData indexRow1 = intIndexRow(11, 13);
-    greaterThan.bindFieldReference(rExpr)
-        .bindVal(vExpr)
-        .bindColStats(indexRow1, queryFields(2), rExpr);
-    assertTrue(greaterThan.eval(), "12 < 13");
-
-    RowData indexRow2 = intIndexRow(12, 13);
-    greaterThan.bindColStats(indexRow2, queryFields(2), rExpr);
-    assertTrue(greaterThan.eval(), "12 < 13");
-
-    RowData indexRow3 = intIndexRow(11, 12);
-    greaterThan.bindColStats(indexRow3, queryFields(2), rExpr);
-    assertFalse(greaterThan.eval(), "max 12 = 12");
-
-    RowData indexRow4 = intIndexRow(10, 11);
-    greaterThan.bindColStats(indexRow4, queryFields(2), rExpr);
-    assertFalse(greaterThan.eval(), "max 11 < 12");
-
-    RowData indexRow5 = intIndexRow(13, 14);
-    greaterThan.bindColStats(indexRow5, queryFields(2), rExpr);
-    assertTrue(greaterThan.eval(), "12 < 13");
-
-    RowData indexRow6 = intIndexRow(null, null);
-    greaterThan.bindColStats(indexRow6, queryFields(2), rExpr);
-    assertFalse(greaterThan.eval(), "12 <> null");
-
-    greaterThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
-    assertFalse(greaterThan.eval(), "It is not possible to test for NULL 
values with '>' operator");
-  }
-
-  @Test
-  void testLessThanOrEqual() {
-    ExpressionEvaluator.LessThanOrEqual lessThanOrEqual = 
ExpressionEvaluator.LessThanOrEqual.getInstance();
-    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
-    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
-    RowData indexRow1 = intIndexRow(11, 13);
-    lessThanOrEqual.bindFieldReference(rExpr)
-        .bindVal(vExpr)
-        .bindColStats(indexRow1, queryFields(2), rExpr);
-    assertTrue(lessThanOrEqual.eval(), "11 < 12");
-
-    RowData indexRow2 = intIndexRow(12, 13);
-    lessThanOrEqual.bindColStats(indexRow2, queryFields(2), rExpr);
-    assertTrue(lessThanOrEqual.eval(), "min 12 = 12");
-
-    RowData indexRow3 = intIndexRow(11, 12);
-    lessThanOrEqual.bindColStats(indexRow3, queryFields(2), rExpr);
-    assertTrue(lessThanOrEqual.eval(), "max 12 = 12");
-
-    RowData indexRow4 = intIndexRow(10, 11);
-    lessThanOrEqual.bindColStats(indexRow4, queryFields(2), rExpr);
-    assertTrue(lessThanOrEqual.eval(), "max 11 < 12");
-
-    RowData indexRow5 = intIndexRow(13, 14);
-    lessThanOrEqual.bindColStats(indexRow5, queryFields(2), rExpr);
-    assertFalse(lessThanOrEqual.eval(), "12 < 13");
-
-    RowData indexRow6 = intIndexRow(null, null);
-    lessThanOrEqual.bindColStats(indexRow6, queryFields(2), rExpr);
-    assertFalse(lessThanOrEqual.eval(), "12 <> null");
-
-    lessThanOrEqual.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
-    assertFalse(lessThanOrEqual.eval(), "It is not possible to test for NULL 
values with '<=' operator");
-  }
-
-  @Test
-  void testGreaterThanOrEqual() {
-    ExpressionEvaluator.GreaterThanOrEqual greaterThanOrEqual = 
ExpressionEvaluator.GreaterThanOrEqual.getInstance();
-    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
-    ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
-    RowData indexRow1 = intIndexRow(11, 13);
-    greaterThanOrEqual.bindFieldReference(rExpr)
-        .bindVal(vExpr)
-        .bindColStats(indexRow1, queryFields(2), rExpr);
-    assertTrue(greaterThanOrEqual.eval(), "12 < 13");
-
-    RowData indexRow2 = intIndexRow(12, 13);
-    greaterThanOrEqual.bindColStats(indexRow2, queryFields(2), rExpr);
-    assertTrue(greaterThanOrEqual.eval(), "min 12 = 12");
-
-    RowData indexRow3 = intIndexRow(11, 12);
-    greaterThanOrEqual.bindColStats(indexRow3, queryFields(2), rExpr);
-    assertTrue(greaterThanOrEqual.eval(), "max 12 = 12");
-
-    RowData indexRow4 = intIndexRow(10, 11);
-    greaterThanOrEqual.bindColStats(indexRow4, queryFields(2), rExpr);
-    assertFalse(greaterThanOrEqual.eval(), "max 11 < 12");
-
-    RowData indexRow5 = intIndexRow(13, 14);
-    greaterThanOrEqual.bindColStats(indexRow5, queryFields(2), rExpr);
-    assertTrue(greaterThanOrEqual.eval(), "12 < 13");
-
-    RowData indexRow6 = intIndexRow(null, null);
-    greaterThanOrEqual.bindColStats(indexRow6, queryFields(2), rExpr);
-    assertFalse(greaterThanOrEqual.eval(), "12 <> null");
-
-    greaterThanOrEqual.bindVal(new ValueLiteralExpression(null, 
DataTypes.INT()));
-    assertFalse(greaterThanOrEqual.eval(), "It is not possible to test for 
NULL values with '>=' operator");
-  }
-
-  @Test
-  void testIn() {
-    ExpressionEvaluator.In in = ExpressionEvaluator.In.getInstance();
-    FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
-
-    RowData indexRow1 = intIndexRow(11, 13);
-    in.bindFieldReference(rExpr)
-        .bindColStats(indexRow1, queryFields(2), rExpr);
-    in.bindVals(12);
-    assertTrue(in.eval(), "11 < 12 < 13");
-
-    RowData indexRow2 = intIndexRow(12, 13);
-    in.bindColStats(indexRow2, queryFields(2), rExpr);
-    assertTrue(in.eval(), "min 12 = 12");
-
-    RowData indexRow3 = intIndexRow(11, 12);
-    in.bindColStats(indexRow3, queryFields(2), rExpr);
-    assertTrue(in.eval(), "max 12 = 12");
-
-    RowData indexRow4 = intIndexRow(10, 11);
-    in.bindColStats(indexRow4, queryFields(2), rExpr);
-    assertFalse(in.eval(), "max 11 < 12");
-
-    RowData indexRow5 = intIndexRow(13, 14);
-    in.bindColStats(indexRow5, queryFields(2), rExpr);
-    assertFalse(in.eval(), "12 < 13");
-
-    RowData indexRow6 = intIndexRow(null, null);
-    in.bindColStats(indexRow6, queryFields(2), rExpr);
-    assertFalse(in.eval(), "12 <> null");
-
-    in.bindVals((Object) null);
-    assertFalse(in.eval(), "It is not possible to test for NULL values with 
'in' operator");
-  }
-
-  @Test
-  void testAlwaysFalse() {
-    FieldReferenceExpression ref = new FieldReferenceExpression("f_int", 
DataTypes.INT(), 2, 2);
-    ValueLiteralExpression nullLiteral = new ValueLiteralExpression(null, 
DataTypes.INT());
-    RowData indexRow = intIndexRow(11, 13);
-    RowType.RowField[] queryFields = queryFields(2);
-    FunctionDefinition[] funDefs = new FunctionDefinition[] {
-        BuiltInFunctionDefinitions.EQUALS,
-        BuiltInFunctionDefinitions.NOT_EQUALS,
-        BuiltInFunctionDefinitions.LESS_THAN,
-        BuiltInFunctionDefinitions.GREATER_THAN,
-        BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
-        BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
-        BuiltInFunctionDefinitions.IN};
-    for (FunctionDefinition funDef : funDefs) {
-      CallExpression expr =
-          new CallExpression(
-              funDef,
-              Arrays.asList(ref, nullLiteral),
-              DataTypes.BOOLEAN());
-      // always return false if the literal value is null
-      assertFalse(bindCall(expr, indexRow, queryFields).eval());
-    }
-  }
-
-  private static RowData intIndexRow(Integer minVal, Integer maxVal) {
-    return intIndexRow(minVal, maxVal, 2L);
-  }
-
-  private static RowData intIndexRow(Integer minVal, Integer maxVal, Long 
nullCnt) {
-    return indexRow(StringData.fromString("f1"), 100L,
-        minVal, maxVal, nullCnt,
-        StringData.fromString("1"), StringData.fromString("100"), 5L,
-        TimestampData.fromEpochMillis(1), TimestampData.fromEpochMillis(100), 
3L);
-  }
-
-  private static RowData indexRow(Object... fields) {
-    return TestData.insertRow(INDEX_ROW_TYPE, fields);
-  }
-
-  private static RowType.RowField[] queryFields(int... pos) {
-    List<RowType.RowField> fields = ((RowType) 
ROW_DATA_TYPE.getLogicalType()).getFields();
-    return 
Arrays.stream(pos).mapToObj(fields::get).toArray(RowType.RowField[]::new);
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 5aba8885a64..4c34a318bfd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1583,6 +1583,13 @@ public class ITTestHoodieDataSource {
         + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
         + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
         + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
+    // filter by in expression
+    List<Row> result4 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1 where uuid in ('id6', 'id7', 
'id8')").execute().collect());
+    assertRowsEquals(result4, "["
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
   }
 
   @Test


Reply via email to