This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 79428391bac [HUDI-5851] Improvement of data skipping, only converts
expressions to evaluators once (#8051)
79428391bac is described below
commit 79428391bac7277ffa9e18c75594a6fb9b8c5665
Author: Jing Zhang <[email protected]>
AuthorDate: Fri Mar 10 14:53:16 2023 +0800
[HUDI-5851] Improvement of data skipping, only converts expressions to
evaluators once (#8051)
* Add log to FileIndex about the data skipping info
* Move all evaluators and relative utility in one class
---
.../java/org/apache/hudi/source/DataPruner.java | 140 +++++
.../apache/hudi/source/ExpressionEvaluators.java | 576 ++++++++++++++++++++
.../java/org/apache/hudi/source/FileIndex.java | 46 +-
.../org/apache/hudi/source/stats/ColumnStats.java | 72 +++
.../hudi/source/stats/ExpressionEvaluator.java | 605 ---------------------
.../hudi/source/TestExpressionEvaluators.java | 408 ++++++++++++++
.../hudi/source/stats/TestExpressionEvaluator.java | 403 --------------
.../apache/hudi/table/ITTestHoodieDataSource.java | 7 +
8 files changed, 1230 insertions(+), 1027 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/DataPruner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/DataPruner.java
new file mode 100644
index 00000000000..605fcdf7fb0
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/DataPruner.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.util.ExpressionUtils;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.source.ExpressionEvaluators.fromExpression;
+
+/**
+ * Utility to do data skipping.
+ */
+public class DataPruner implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String[] referencedCols;
+ private final List<ExpressionEvaluators.Evaluator> evaluators;
+
+ private DataPruner(String[] referencedCols,
List<ExpressionEvaluators.Evaluator> evaluators) {
+ this.referencedCols = referencedCols;
+ this.evaluators = evaluators;
+ }
+
+ /**
+ * Filters the index row with specific data filters and query fields.
+ *
+ * @param indexRow The index row
+ * @param queryFields The query fields referenced by the filters
+ * @return true if the index row should be considered as a candidate
+ */
+ public boolean test(RowData indexRow, RowType.RowField[] queryFields) {
+ Map<String, ColumnStats> columnStatsMap = convertColumnStats(indexRow,
queryFields);
+ for (ExpressionEvaluators.Evaluator evaluator : evaluators) {
+ if (!evaluator.eval(columnStatsMap)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public String[] getReferencedCols() {
+ return referencedCols;
+ }
+
+ public static DataPruner newInstance(List<ResolvedExpression> filters) {
+ if (filters == null || filters.size() == 0) {
+ return null;
+ }
+ String[] referencedCols = ExpressionUtils.referencedColumns(filters);
+ if (referencedCols.length == 0) {
+ return null;
+ }
+ List<ExpressionEvaluators.Evaluator> evaluators = fromExpression(filters);
+ return new DataPruner(referencedCols, evaluators);
+ }
+
+ public static Map<String, ColumnStats> convertColumnStats(RowData indexRow,
RowType.RowField[] queryFields) {
+ if (indexRow == null || queryFields == null) {
+ throw new IllegalArgumentException("Index Row and query fields could not
be null.");
+ }
+ Map<String, ColumnStats> mapping = new LinkedHashMap<>();
+ for (int i = 0; i < queryFields.length; i++) {
+ String name = queryFields[i].getName();
+ int startPos = 2 + i * 3;
+ LogicalType colType = queryFields[i].getType();
+ Object minVal = indexRow.isNullAt(startPos) ? null :
getValAsJavaObj(indexRow, startPos, colType);
+ Object maxVal = indexRow.isNullAt(startPos + 1) ? null :
getValAsJavaObj(indexRow, startPos + 1, colType);
+ long nullCnt = indexRow.getLong(startPos + 2);
+ mapping.put(name, new ColumnStats(minVal, maxVal, nullCnt));
+ }
+ return mapping;
+ }
+
+ /**
+ * Returns the value as Java object at position {@code pos} of row {@code
indexRow}.
+ */
+ private static Object getValAsJavaObj(RowData indexRow, int pos, LogicalType
colType) {
+ switch (colType.getTypeRoot()) {
+ // NOTE: Since we can't rely on Avro's "date", and "timestamp-micros"
logical-types, we're
+ // manually encoding corresponding values as int and long w/in the
Column Stats Index and
+ // here we have to decode those back into corresponding logical
representation.
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ TimestampType tsType = (TimestampType) colType;
+ return indexRow.getTimestamp(pos,
tsType.getPrecision()).getMillisecond();
+ case TIME_WITHOUT_TIME_ZONE:
+ case DATE:
+ return indexRow.getLong(pos);
+ // NOTE: All integral types of size less than Int are encoded as Ints in
MT
+ case BOOLEAN:
+ return indexRow.getBoolean(pos);
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ return indexRow.getInt(pos);
+ case FLOAT:
+ return indexRow.getFloat(pos);
+ case DOUBLE:
+ return indexRow.getDouble(pos);
+ case BINARY:
+ case VARBINARY:
+ return indexRow.getBinary(pos);
+ case CHAR:
+ case VARCHAR:
+ return indexRow.getString(pos).toString();
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) colType;
+ return indexRow.getDecimal(pos, decimalType.getPrecision(),
decimalType.getScale()).toBigDecimal();
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
colType);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
new file mode 100644
index 00000000000..a0cf9b1d89c
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.util.ExpressionUtils;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.validation.constraints.NotNull;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Tool to evaluate the {@link
org.apache.flink.table.expressions.ResolvedExpression}s.
+ */
+public class ExpressionEvaluators {
+
+ /**
+ * Converts specific call expression list to the evaluator list.
+ */
+ public static List<Evaluator> fromExpression(List<ResolvedExpression> exprs)
{
+ return exprs.stream()
+ .map(e -> fromExpression((CallExpression) e))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Converts specific call expression to the evaluator.
+ * <p>Two steps to bind the call:
+ * 1. map the evaluator instance;
+ * 2. bind the field reference;
+ *
+ * <p>Normalize the expression to simplify the subsequent decision logic:
+ * always put the literal expression in the RHS.
+ */
+ public static Evaluator fromExpression(CallExpression expr) {
+ FunctionDefinition funDef = expr.getFunctionDefinition();
+ List<Expression> childExprs = expr.getChildren();
+
+ boolean normalized = childExprs.get(0) instanceof FieldReferenceExpression;
+
+ if (BuiltInFunctionDefinitions.NOT.equals(funDef)) {
+ Not evaluator = Not.getInstance();
+ Evaluator childEvaluator = fromExpression((CallExpression)
childExprs.get(0));
+ return evaluator.bindEvaluator(childEvaluator);
+ }
+
+ if (BuiltInFunctionDefinitions.AND.equals(funDef)) {
+ And evaluator = And.getInstance();
+ Evaluator evaluator1 = fromExpression((CallExpression)
childExprs.get(0));
+ Evaluator evaluator2 = fromExpression((CallExpression)
childExprs.get(1));
+ return evaluator.bindEvaluator(evaluator1, evaluator2);
+ }
+
+ if (BuiltInFunctionDefinitions.OR.equals(funDef)) {
+ Or evaluator = Or.getInstance();
+ Evaluator evaluator1 = fromExpression((CallExpression)
childExprs.get(0));
+ Evaluator evaluator2 = fromExpression((CallExpression)
childExprs.get(1));
+ return evaluator.bindEvaluator(evaluator1, evaluator2);
+ }
+
+ // handle unary operators
+ if (BuiltInFunctionDefinitions.IS_NULL.equals(funDef)) {
+ FieldReferenceExpression rExpr = (FieldReferenceExpression)
childExprs.get(0);
+ return IsNull.getInstance()
+ .bindFieldReference(rExpr);
+ } else if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(funDef)) {
+ FieldReferenceExpression rExpr = (FieldReferenceExpression)
childExprs.get(0);
+ return IsNotNull.getInstance()
+ .bindFieldReference(rExpr);
+ }
+
+ boolean hasNullLiteral =
+ childExprs.stream().anyMatch(e ->
+ e instanceof ValueLiteralExpression
+ &&
ExpressionUtils.getValueFromLiteral((ValueLiteralExpression) e) == null);
+ if (hasNullLiteral) {
+ return AlwaysFalse.getInstance();
+ }
+
+ // handle IN specifically
+ if (BuiltInFunctionDefinitions.IN.equals(funDef)) {
+ ValidationUtils.checkState(normalized, "The IN expression expects to be
normalized");
+ In in = In.getInstance();
+ FieldReferenceExpression rExpr = (FieldReferenceExpression)
childExprs.get(0);
+ in.bindFieldReference(rExpr);
+ in.bindVals(getInLiteralVals(childExprs));
+ return in;
+ }
+
+ NullFalseEvaluator evaluator;
+ // handle binary operators
+ if (BuiltInFunctionDefinitions.EQUALS.equals(funDef)) {
+ evaluator = EqualTo.getInstance();
+ } else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(funDef)) {
+ evaluator = NotEqualTo.getInstance();
+ } else if (BuiltInFunctionDefinitions.LESS_THAN.equals(funDef)) {
+ evaluator = normalized ? LessThan.getInstance() :
GreaterThan.getInstance();
+ } else if (BuiltInFunctionDefinitions.GREATER_THAN.equals(funDef)) {
+ evaluator = normalized ? GreaterThan.getInstance() :
LessThan.getInstance();
+ } else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(funDef)) {
+ evaluator = normalized ? LessThanOrEqual.getInstance() :
GreaterThanOrEqual.getInstance();
+ } else if
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(funDef)) {
+ evaluator = normalized ? GreaterThanOrEqual.getInstance() :
LessThanOrEqual.getInstance();
+ } else {
+ throw new AssertionError("Unexpected function definition " + funDef);
+ }
+ FieldReferenceExpression rExpr = normalized
+ ? (FieldReferenceExpression) childExprs.get(0)
+ : (FieldReferenceExpression) childExprs.get(1);
+ ValueLiteralExpression vExpr = normalized
+ ? (ValueLiteralExpression) childExprs.get(1)
+ : (ValueLiteralExpression) childExprs.get(0);
+ evaluator
+ .bindVal(vExpr)
+ .bindFieldReference(rExpr);
+ return evaluator;
+ }
+
+ /**
+ * Decides whether it's possible to match based on the column values and
column stats.
+ * The evaluator can be nested.
+ */
+ public interface Evaluator extends Serializable {
+
+ /**
+ * Decides whether it's possible to match based on the column stats.
+ *
+ * @param columnStatsMap column statistics
+ * @return
+ */
+ boolean eval(Map<String, ColumnStats> columnStatsMap);
+ }
+
+ /**
+ * Leaf evaluator which depends on the given field.
+ */
+ public abstract static class LeafEvaluator implements Evaluator {
+
+ // referenced field type
+ protected LogicalType type;
+
+ // referenced field name
+ protected String name;
+
+ // referenced field index
+ protected int index;
+
+ public LeafEvaluator bindFieldReference(FieldReferenceExpression expr) {
+ this.type = expr.getOutputDataType().getLogicalType();
+ this.name = expr.getName();
+ this.index = expr.getFieldIndex();
+ return this;
+ }
+
+ protected ColumnStats getColumnStats(Map<String, ColumnStats>
columnStatsMap) {
+ ColumnStats columnStats = columnStatsMap.get(this.name);
+ ValidationUtils.checkState(
+ columnStats != null,
+ "Can not find column " + this.name);
+ return columnStats;
+ }
+ }
+
+ /**
+ * Leaf evaluator which compares the field value with literal values.
+ */
+ public abstract static class NullFalseEvaluator extends LeafEvaluator {
+
+ // the constant literal value
+ protected Object val;
+
+ public NullFalseEvaluator bindVal(ValueLiteralExpression vExpr) {
+ this.val = ExpressionUtils.getValueFromLiteral(vExpr);
+ return this;
+ }
+
+ @Override
+ public final boolean eval(Map<String, ColumnStats> columnStatsMap) {
+ if (this.val == null) {
+ return false;
+ } else {
+ return eval(this.val, getColumnStats(columnStatsMap), this.type);
+ }
+ }
+
+ protected abstract boolean eval(@NotNull Object val, ColumnStats
columnStats, LogicalType type);
+ }
+
+ /**
+ * To evaluate = expr.
+ */
+ public static class EqualTo extends NullFalseEvaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static EqualTo getInstance() {
+ return new EqualTo();
+ }
+
+ @Override
+ protected boolean eval(@NotNull Object val, ColumnStats columnStats,
LogicalType type) {
+ Object minVal = columnStats.getMinVal();
+ Object maxVal = columnStats.getMaxVal();
+ if (minVal == null || maxVal == null) {
+ return false;
+ }
+ if (compare(minVal, val, type) > 0) {
+ return false;
+ }
+ return compare(maxVal, val, type) >= 0;
+ }
+ }
+
+ /**
+ * To evaluate <> expr.
+ */
+ public static class NotEqualTo extends NullFalseEvaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static NotEqualTo getInstance() {
+ return new NotEqualTo();
+ }
+
+ @Override
+ protected boolean eval(@NotNull Object val, ColumnStats columnStats,
LogicalType type) {
+ // because the bounds are not necessarily a min or max value, this
cannot be answered using them.
+ // notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
+ return true;
+ }
+ }
+
+ /**
+ * To evaluate IS NULL expr.
+ */
+ public static class IsNull extends LeafEvaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static IsNull getInstance() {
+ return new IsNull();
+ }
+
+ @Override
+ public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+ ColumnStats columnStats = getColumnStats(columnStatsMap);
+ return columnStats.getNullCnt() > 0;
+ }
+ }
+
+ /**
+ * To evaluate IS NOT NULL expr.
+ */
+ public static class IsNotNull extends LeafEvaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static IsNotNull getInstance() {
+ return new IsNotNull();
+ }
+
+ @Override
+ public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+ ColumnStats columnStats = getColumnStats(columnStatsMap);
+ // should consider FLOAT/DOUBLE & NAN
+ return columnStats.getMinVal() != null || columnStats.getNullCnt() <= 0;
+ }
+ }
+
+ /**
+ * To evaluate < expr.
+ */
+ public static class LessThan extends NullFalseEvaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static LessThan getInstance() {
+ return new LessThan();
+ }
+
+ @Override
+ public boolean eval(@NotNull Object val, ColumnStats columnStats,
LogicalType type) {
+ Object minVal = columnStats.getMinVal();
+ if (minVal == null) {
+ return false;
+ }
+ return compare(minVal, val, type) < 0;
+ }
+ }
+
+ /**
+ * To evaluate > expr.
+ */
+ public static class GreaterThan extends NullFalseEvaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static GreaterThan getInstance() {
+ return new GreaterThan();
+ }
+
+ @Override
+ protected boolean eval(@NotNull Object val, ColumnStats columnStats,
LogicalType type) {
+ Object maxVal = columnStats.getMaxVal();
+ if (maxVal == null) {
+ return false;
+ }
+ return compare(maxVal, val, type) > 0;
+ }
+ }
+
+ /**
+ * To evaluate <= expr.
+ */
+ public static class LessThanOrEqual extends NullFalseEvaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static LessThanOrEqual getInstance() {
+ return new LessThanOrEqual();
+ }
+
+ @Override
+ protected boolean eval(@NotNull Object val, ColumnStats columnStats,
LogicalType type) {
+ Object minVal = columnStats.getMinVal();
+ if (minVal == null) {
+ return false;
+ }
+ return compare(minVal, val, type) <= 0;
+ }
+ }
+
+ /**
+ * To evaluate >= expr.
+ */
+ public static class GreaterThanOrEqual extends NullFalseEvaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static GreaterThanOrEqual getInstance() {
+ return new GreaterThanOrEqual();
+ }
+
+ @Override
+ protected boolean eval(@NotNull Object val, ColumnStats columnStats,
LogicalType type) {
+ Object maxVal = columnStats.getMaxVal();
+ if (maxVal == null) {
+ return false;
+ }
+ return compare(maxVal, val, type) >= 0;
+ }
+ }
+
+ /**
+ * To evaluate IN expr.
+ */
+ public static class In extends LeafEvaluator {
+ private static final long serialVersionUID = 1L;
+
+ private static final int IN_PREDICATE_LIMIT = 200;
+
+ public static In getInstance() {
+ return new In();
+ }
+
+ private Object[] vals;
+
+ @Override
+ public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+ if (Arrays.stream(vals).anyMatch(Objects::isNull)) {
+ return false;
+ }
+ ColumnStats columnStats = getColumnStats(columnStatsMap);
+ Object minVal = columnStats.getMinVal();
+ Object maxVal = columnStats.getMaxVal();
+ if (minVal == null) {
+ return false; // values are all null and literalSet cannot contain
null.
+ }
+
+ if (vals.length > IN_PREDICATE_LIMIT) {
+ // skip evaluating the predicate if the number of values is too big
+ return true;
+ }
+
+ return Arrays.stream(vals).anyMatch(v ->
+ compare(minVal, v, this.type) <= 0 && compare(maxVal, v, this.type)
>= 0);
+ }
+
+ public void bindVals(Object... vals) {
+ this.vals = vals;
+ }
+ }
+
+ /**
+ * A special evaluator which is not possible to match any condition.
+ */
+ public static class AlwaysFalse implements Evaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static AlwaysFalse getInstance() {
+ return new AlwaysFalse();
+ }
+
+ @Override
+ public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+ return false;
+ }
+ }
+
+ // component predicate
+
+ /**
+ * To evaluate NOT expr.
+ */
+ public static class Not implements Evaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static Not getInstance() {
+ return new Not();
+ }
+
+ private Evaluator evaluator;
+
+ @Override
+ public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+ return !this.evaluator.eval(columnStatsMap);
+ }
+
+ public Evaluator bindEvaluator(Evaluator evaluator) {
+ this.evaluator = evaluator;
+ return this;
+ }
+ }
+
+ /**
+ * To evaluate AND expr.
+ */
+ public static class And implements Evaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static And getInstance() {
+ return new And();
+ }
+
+ private Evaluator[] evaluators;
+
+ @Override
+ public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+ for (Evaluator evaluator : evaluators) {
+ if (!evaluator.eval(columnStatsMap)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Evaluator bindEvaluator(Evaluator... evaluators) {
+ this.evaluators = evaluators;
+ return this;
+ }
+ }
+
+ /**
+ * To evaluate OR expr.
+ */
+ public static class Or implements Evaluator {
+ private static final long serialVersionUID = 1L;
+
+ public static Or getInstance() {
+ return new Or();
+ }
+
+ private Evaluator[] evaluators;
+
+ @Override
+ public boolean eval(Map<String, ColumnStats> columnStatsMap) {
+ for (Evaluator evaluator : evaluators) {
+ if (evaluator.eval(columnStatsMap)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public Evaluator bindEvaluator(Evaluator... evaluators) {
+ this.evaluators = evaluators;
+ return this;
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ /**
+ * Returns the IN expression literal values.
+ */
+ private static Object[] getInLiteralVals(List<Expression> childExprs) {
+ List<Object> vals = new ArrayList<>();
+ for (int i = 1; i < childExprs.size(); i++) {
+ vals.add(ExpressionUtils.getValueFromLiteral((ValueLiteralExpression)
childExprs.get(i)));
+ }
+ return vals.toArray();
+ }
+
+ private static int compare(@NotNull Object val1, @NotNull Object val2,
LogicalType logicalType) {
+ switch (logicalType.getTypeRoot()) {
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case DATE:
+ return ((Long) val1).compareTo((Long) val2);
+ case BOOLEAN:
+ return ((Boolean) val1).compareTo((Boolean) val2);
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ return ((Integer) val1).compareTo((Integer) val2);
+ case FLOAT:
+ return ((Float) val1).compareTo((Float) val2);
+ case DOUBLE:
+ return ((Double) val1).compareTo((Double) val2);
+ case BINARY:
+ case VARBINARY:
+ return compareBytes((byte[]) val1, (byte[]) val2);
+ case CHAR:
+ case VARCHAR:
+ return ((String) val1).compareTo((String) val2);
+ case DECIMAL:
+ return ((BigDecimal) val1).compareTo((BigDecimal) val2);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
logicalType);
+ }
+ }
+
+ private static int compareBytes(byte[] v1, byte[] v2) {
+ int len1 = v1.length;
+ int len2 = v2.length;
+ int lim = Math.min(len1, len2);
+
+ int k = 0;
+ while (k < lim) {
+ byte c1 = v1[k];
+ byte c2 = v2[k];
+ if (c1 != c2) {
+ return c1 - c2;
+ }
+ k++;
+ }
+ return len1 - len2;
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index 9ee93719813..daedaf8fa3c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -25,9 +25,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.source.stats.ColumnStatsIndices;
-import org.apache.hudi.source.stats.ExpressionEvaluator;
import org.apache.hudi.util.DataTypeUtils;
-import org.apache.hudi.util.ExpressionUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -67,6 +65,7 @@ public class FileIndex {
private List<String> partitionPaths; // cache of partition paths
private List<ResolvedExpression> filters; // push down filters
private final boolean tableExists;
+ private DataPruner dataPruner;
private FileIndex(Path path, Configuration conf, RowType rowType) {
this.path = path;
@@ -145,9 +144,16 @@ public class FileIndex {
// no need to filter by col stats or error occurs.
return allFileStatus;
}
- return Arrays.stream(allFileStatus).parallel()
+ FileStatus[] results = Arrays.stream(allFileStatus).parallel()
.filter(fileStatus ->
candidateFiles.contains(fileStatus.getPath().getName()))
.toArray(FileStatus[]::new);
+ double totalFileSize = allFileStatus.length;
+ double resultFileSize = results.length;
+ double skippingPercent = totalFileSize != 0 ? (totalFileSize -
resultFileSize) / totalFileSize : 0;
+ LOG.info("Total files: " + totalFileSize
+ + "; candidate files after data skipping: " + resultFileSize
+ + "; skipping percent " + skippingPercent);
+ return results;
}
/**
@@ -191,6 +197,7 @@ public class FileIndex {
public void setFilters(List<ResolvedExpression> filters) {
if (filters.size() > 0) {
this.filters = new ArrayList<>(filters);
+ this.dataPruner = initializeDataPruner(filters);
}
}
@@ -213,24 +220,11 @@ public class FileIndex {
*/
@Nullable
private Set<String> candidateFilesInMetadataTable(FileStatus[]
allFileStatus) {
- // NOTE: Data Skipping is only effective when it references columns that
are indexed w/in
- // the Column Stats Index (CSI). Following cases could not be
effectively handled by Data Skipping:
- // - Expressions on top-level column's fields (ie, for ex filters
like "struct.field > 0", since
- // CSI only contains stats for top-level columns, in this case
for "struct")
- // - Any expression not directly referencing top-level column
(for ex, sub-queries, since there's
- // nothing CSI in particular could be applied for)
- if (!metadataConfig.enabled() || !dataSkippingEnabled) {
- validateConfig();
- return null;
- }
- if (this.filters == null || this.filters.size() == 0) {
- return null;
- }
- String[] referencedCols = ExpressionUtils.referencedColumns(filters);
- if (referencedCols.length == 0) {
+ if (dataPruner == null) {
return null;
}
try {
+ String[] referencedCols = dataPruner.getReferencedCols();
final List<RowData> colStats =
ColumnStatsIndices.readColumnStatsIndex(path.toString(), metadataConfig,
referencedCols);
final Pair<List<RowData>, String[]> colStatsTable =
ColumnStatsIndices.transposeColumnStatsIndex(colStats, referencedCols, rowType);
List<RowData> transposedColStats = colStatsTable.getLeft();
@@ -245,7 +239,7 @@ public class FileIndex {
.map(row -> row.getString(0).toString())
.collect(Collectors.toSet());
Set<String> candidateFileNames = transposedColStats.stream().parallel()
- .filter(row -> ExpressionEvaluator.filterExprs(filters, row,
queryFields))
+ .filter(row -> dataPruner.test(row, queryFields))
.map(row -> row.getString(0).toString())
.collect(Collectors.toSet());
@@ -303,4 +297,18 @@ public class FileIndex {
public List<ResolvedExpression> getFilters() {
return filters;
}
+
+ private DataPruner initializeDataPruner(List<ResolvedExpression> filters) {
+ // NOTE: Data Skipping is only effective when it references columns that
are indexed w/in
+ // the Column Stats Index (CSI). Following cases could not be
effectively handled by Data Skipping:
+ // - Expressions on top-level column's fields (ie, for ex filters
like "struct.field > 0", since
+ // CSI only contains stats for top-level columns, in this case
for "struct")
+ // - Any expression not directly referencing top-level column
(for ex, sub-queries, since there's
+ // nothing CSI in particular could be applied for)
+ if (!metadataConfig.enabled() || !dataSkippingEnabled) {
+ validateConfig();
+ return null;
+ }
+ return DataPruner.newInstance(filters);
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStats.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStats.java
new file mode 100644
index 00000000000..7ec5371ba12
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStats.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * Column statistics.
+ */
+public class ColumnStats {
+
+ @Nullable private final Object minVal;
+ @Nullable private final Object maxVal;
+ private final long nullCnt;
+
+ public ColumnStats(@Nullable Object minVal, @Nullable Object maxVal, long
nullCnt) {
+ this.minVal = minVal;
+ this.maxVal = maxVal;
+ this.nullCnt = nullCnt;
+ }
+
+ @Nullable
+ public Object getMinVal() {
+ return minVal;
+ }
+
+ @Nullable
+ public Object getMaxVal() {
+ return maxVal;
+ }
+
+ public long getNullCnt() {
+ return nullCnt;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ColumnStats that = (ColumnStats) o;
+ return nullCnt == that.nullCnt
+ && Objects.equals(minVal, that.minVal)
+ && Objects.equals(maxVal, that.maxVal);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(minVal, maxVal, nullCnt);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
deleted file mode 100644
index d1bb01126a0..00000000000
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
+++ /dev/null
@@ -1,605 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.stats;
-
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.util.ExpressionUtils;
-
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.CallExpression;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ValueLiteralExpression;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
-
-import javax.validation.constraints.NotNull;
-
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Tool to evaluate the {@link
org.apache.flink.table.expressions.ResolvedExpression}s.
- */
-public class ExpressionEvaluator {
- private static final int IN_PREDICATE_LIMIT = 200;
-
- /**
- * Filter the index row with specific data filters and query fields.
- *
- * @param filters The pushed down data filters
- * @param indexRow The index row
- * @param queryFields The query fields referenced by the filters
- * @return true if the index row should be considered as a candidate
- */
- public static boolean filterExprs(List<ResolvedExpression> filters, RowData
indexRow, RowType.RowField[] queryFields) {
- for (ResolvedExpression filter : filters) {
- if (!Evaluator.bindCall((CallExpression) filter, indexRow,
queryFields).eval()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Used for deciding whether the literal values match the column stats.
- * The evaluator can be nested.
- */
- public abstract static class Evaluator {
- // the constant literal value
- protected Object val;
-
- // column stats
- protected Object minVal;
- protected Object maxVal;
- protected long nullCnt = 0;
-
- // referenced field type
- protected LogicalType type;
-
- /**
- * Binds the evaluator with specific call expression.
- *
- * <p>Three steps to bind the call:
- * 1. map the evaluator instance;
- * 2. bind the field reference;
- * 3. bind the column stats.
- *
- * <p>Normalize the expression to simplify the subsequent decision logic:
- * always put the literal expression in the RHS.
- */
- public static Evaluator bindCall(CallExpression call, RowData indexRow,
RowType.RowField[] queryFields) {
- FunctionDefinition funDef = call.getFunctionDefinition();
- List<Expression> childExprs = call.getChildren();
-
- boolean normalized = childExprs.get(0) instanceof
FieldReferenceExpression;
- final Evaluator evaluator;
-
- if (BuiltInFunctionDefinitions.NOT.equals(funDef)) {
- evaluator = Not.getInstance();
- Evaluator childEvaluator = bindCall((CallExpression)
childExprs.get(0), indexRow, queryFields);
- return ((Not) evaluator).bindEvaluator(childEvaluator);
- }
-
- if (BuiltInFunctionDefinitions.AND.equals(funDef)) {
- evaluator = And.getInstance();
- Evaluator evaluator1 = bindCall((CallExpression) childExprs.get(0),
indexRow, queryFields);
- Evaluator evaluator2 = bindCall((CallExpression) childExprs.get(1),
indexRow, queryFields);
- return ((And) evaluator).bindEvaluator(evaluator1, evaluator2);
- }
-
- if (BuiltInFunctionDefinitions.OR.equals(funDef)) {
- evaluator = Or.getInstance();
- Evaluator evaluator1 = bindCall((CallExpression) childExprs.get(0),
indexRow, queryFields);
- Evaluator evaluator2 = bindCall((CallExpression) childExprs.get(1),
indexRow, queryFields);
- return ((Or) evaluator).bindEvaluator(evaluator1, evaluator2);
- }
-
- // handle unary operators
- if (BuiltInFunctionDefinitions.IS_NULL.equals(funDef)) {
- FieldReferenceExpression rExpr = (FieldReferenceExpression)
childExprs.get(0);
- return IsNull.getInstance()
- .bindFieldReference(rExpr)
- .bindColStats(indexRow, queryFields, rExpr);
- } else if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(funDef)) {
- FieldReferenceExpression rExpr = (FieldReferenceExpression)
childExprs.get(0);
- return IsNotNull.getInstance()
- .bindFieldReference(rExpr)
- .bindColStats(indexRow, queryFields, rExpr);
- }
-
- boolean hasNullLiteral =
- childExprs.stream().anyMatch(e ->
- e instanceof ValueLiteralExpression
- &&
ExpressionUtils.getValueFromLiteral((ValueLiteralExpression) e) == null);
- if (hasNullLiteral) {
- evaluator = AlwaysFalse.getInstance();
- return evaluator;
- }
-
- // handle IN specifically
- if (BuiltInFunctionDefinitions.IN.equals(funDef)) {
- ValidationUtils.checkState(normalized, "The IN expression expects to
be normalized");
- evaluator = In.getInstance();
- FieldReferenceExpression rExpr = (FieldReferenceExpression)
childExprs.get(0);
- evaluator.bindFieldReference(rExpr);
- ((In) evaluator).bindVals(getInLiteralVals(childExprs));
- return evaluator.bindColStats(indexRow, queryFields, rExpr);
- }
-
- // handle binary operators
- if (BuiltInFunctionDefinitions.EQUALS.equals(funDef)) {
- evaluator = EqualTo.getInstance();
- } else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(funDef)) {
- evaluator = NotEqualTo.getInstance();
- } else if (BuiltInFunctionDefinitions.LESS_THAN.equals(funDef)) {
- evaluator = normalized ? LessThan.getInstance() :
GreaterThan.getInstance();
- } else if (BuiltInFunctionDefinitions.GREATER_THAN.equals(funDef)) {
- evaluator = normalized ? GreaterThan.getInstance() :
LessThan.getInstance();
- } else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(funDef))
{
- evaluator = normalized ? LessThanOrEqual.getInstance() :
GreaterThanOrEqual.getInstance();
- } else if
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(funDef)) {
- evaluator = normalized ? GreaterThanOrEqual.getInstance() :
LessThanOrEqual.getInstance();
- } else {
- throw new AssertionError("Unexpected function definition " + funDef);
- }
- FieldReferenceExpression rExpr = normalized
- ? (FieldReferenceExpression) childExprs.get(0)
- : (FieldReferenceExpression) childExprs.get(1);
- ValueLiteralExpression vExpr = normalized
- ? (ValueLiteralExpression) childExprs.get(1)
- : (ValueLiteralExpression) childExprs.get(0);
- evaluator
- .bindFieldReference(rExpr)
- .bindVal(vExpr)
- .bindColStats(indexRow, queryFields, rExpr);
- return evaluator;
- }
-
- public Evaluator bindColStats(
- RowData indexRow,
- RowType.RowField[] queryFields,
- FieldReferenceExpression expr) {
- int colPos = -1;
- for (int i = 0; i < queryFields.length; i++) {
- if (expr.getName().equals(queryFields[i].getName())) {
- colPos = i;
- }
- }
- ValidationUtils.checkState(colPos != -1, "Can not find column " +
expr.getName());
- int startPos = 2 + colPos * 3;
- LogicalType colType = queryFields[colPos].getType();
- Object minVal = indexRow.isNullAt(startPos) ? null :
getValAsJavaObj(indexRow, startPos, colType);
- Object maxVal = indexRow.isNullAt(startPos + 1) ? null :
getValAsJavaObj(indexRow, startPos + 1, colType);
- long nullCnt = indexRow.getLong(startPos + 2);
-
- this.minVal = minVal;
- this.maxVal = maxVal;
- this.nullCnt = nullCnt;
- return this;
- }
-
- public Evaluator bindVal(ValueLiteralExpression vExpr) {
- this.val = ExpressionUtils.getValueFromLiteral(vExpr);
- return this;
- }
-
- public Evaluator bindFieldReference(FieldReferenceExpression expr) {
- this.type = expr.getOutputDataType().getLogicalType();
- return this;
- }
-
- public abstract boolean eval();
- }
-
- public abstract static class NullFalseEvaluator extends Evaluator {
-
- @Override
- public final boolean eval() {
- if (this.val == null) {
- return false;
- } else {
- return eval(this.val);
- }
- }
-
- protected abstract boolean eval(@NotNull Object val);
- }
-
- /**
- * To evaluate = expr.
- */
- public static class EqualTo extends NullFalseEvaluator {
-
- public static EqualTo getInstance() {
- return new EqualTo();
- }
-
- @Override
- protected boolean eval(@NotNull Object val) {
- if (this.minVal == null || this.maxVal == null) {
- return false;
- }
- if (compare(this.minVal, val, this.type) > 0) {
- return false;
- }
- return compare(this.maxVal, val, this.type) >= 0;
- }
- }
-
- /**
- * To evaluate <> expr.
- */
- public static class NotEqualTo extends NullFalseEvaluator {
- public static NotEqualTo getInstance() {
- return new NotEqualTo();
- }
-
- @Override
- protected boolean eval(@NotNull Object val) {
- // because the bounds are not necessarily a min or max value, this
cannot be answered using
- // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value
in col.
- return true;
- }
- }
-
- /**
- * To evaluate IS NULL expr.
- */
- public static class IsNull extends Evaluator {
- public static IsNull getInstance() {
- return new IsNull();
- }
-
- @Override
- public boolean eval() {
- return this.nullCnt > 0;
- }
- }
-
- /**
- * To evaluate IS NOT NULL expr.
- */
- public static class IsNotNull extends Evaluator {
- public static IsNotNull getInstance() {
- return new IsNotNull();
- }
-
- @Override
- public boolean eval() {
- // should consider FLOAT/DOUBLE & NAN
- return this.minVal != null || this.nullCnt <= 0;
- }
- }
-
- /**
- * To evaluate < expr.
- */
- public static class LessThan extends NullFalseEvaluator {
- public static LessThan getInstance() {
- return new LessThan();
- }
-
- @Override
- public boolean eval(@NotNull Object val) {
- if (this.minVal == null) {
- return false;
- }
- return compare(this.minVal, val, this.type) < 0;
- }
- }
-
- /**
- * To evaluate > expr.
- */
- public static class GreaterThan extends NullFalseEvaluator {
- public static GreaterThan getInstance() {
- return new GreaterThan();
- }
-
- @Override
- protected boolean eval(@NotNull Object val) {
- if (this.maxVal == null) {
- return false;
- }
- return compare(this.maxVal, val, this.type) > 0;
- }
- }
-
- /**
- * To evaluate <= expr.
- */
- public static class LessThanOrEqual extends NullFalseEvaluator {
- public static LessThanOrEqual getInstance() {
- return new LessThanOrEqual();
- }
-
- @Override
- protected boolean eval(@NotNull Object val) {
- if (this.minVal == null) {
- return false;
- }
- return compare(this.minVal, val, this.type) <= 0;
- }
- }
-
- /**
- * To evaluate >= expr.
- */
- public static class GreaterThanOrEqual extends NullFalseEvaluator {
- public static GreaterThanOrEqual getInstance() {
- return new GreaterThanOrEqual();
- }
-
- @Override
- protected boolean eval(@NotNull Object val) {
- if (this.maxVal == null) {
- return false;
- }
- return compare(this.maxVal, val, this.type) >= 0;
- }
- }
-
- /**
- * To evaluate IN expr.
- */
- public static class In extends Evaluator {
- public static In getInstance() {
- return new In();
- }
-
- private Object[] vals;
-
- @Override
- public boolean eval() {
- if (Arrays.stream(vals).anyMatch(Objects::isNull)) {
- return false;
- }
- if (this.minVal == null) {
- return false; // values are all null and literalSet cannot contain
null.
- }
-
- if (vals.length > IN_PREDICATE_LIMIT) {
- // skip evaluating the predicate if the number of values is too big
- return true;
- }
-
- vals = Arrays.stream(vals).filter(v -> compare(this.minVal, v,
this.type) <= 0).toArray();
- if (vals.length == 0) { // if all values are less than lower bound, rows
cannot match.
- return false;
- }
-
- vals = Arrays.stream(vals).filter(v -> compare(this.maxVal, v,
this.type) >= 0).toArray();
- if (vals.length == 0) { // if all remaining values are greater than
upper bound, rows cannot match.
- return false;
- }
-
- return true;
- }
-
- public void bindVals(Object... vals) {
- this.vals = vals;
- }
- }
-
- // A special evaluator which is not possible to match any condition
- public static class AlwaysFalse extends Evaluator {
-
- public static AlwaysFalse getInstance() {
- return new AlwaysFalse();
- }
-
- @Override
- public Evaluator bindColStats(
- RowData indexRow,
- RowType.RowField[] queryFields,
- FieldReferenceExpression expr) {
- // this no need to do anything
- return this;
- }
-
- @Override
- public boolean eval() {
- return false;
- }
- }
-
- // component predicate
-
- /**
- * To evaluate NOT expr.
- */
- public static class Not extends Evaluator {
- public static Not getInstance() {
- return new Not();
- }
-
- private Evaluator evaluator;
-
- @Override
- public boolean eval() {
- return !this.evaluator.eval();
- }
-
- public Evaluator bindEvaluator(Evaluator evaluator) {
- this.evaluator = evaluator;
- return this;
- }
- }
-
- /**
- * To evaluate AND expr.
- */
- public static class And extends Evaluator {
- public static And getInstance() {
- return new And();
- }
-
- private Evaluator[] evaluators;
-
- @Override
- public boolean eval() {
- for (Evaluator evaluator : evaluators) {
- if (!evaluator.eval()) {
- return false;
- }
- }
- return true;
- }
-
- public Evaluator bindEvaluator(Evaluator... evaluators) {
- this.evaluators = evaluators;
- return this;
- }
- }
-
- /**
- * To evaluate OR expr.
- */
- public static class Or extends Evaluator {
- public static Or getInstance() {
- return new Or();
- }
-
- private Evaluator[] evaluators;
-
- @Override
- public boolean eval() {
- for (Evaluator evaluator : evaluators) {
- if (evaluator.eval()) {
- return true;
- }
- }
- return false;
- }
-
- public Evaluator bindEvaluator(Evaluator... evaluators) {
- this.evaluators = evaluators;
- return this;
- }
- }
-
- // -------------------------------------------------------------------------
- // Utilities
- // -------------------------------------------------------------------------
-
- private static int compare(@NotNull Object val1, @NotNull Object val2,
LogicalType logicalType) {
- switch (logicalType.getTypeRoot()) {
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIME_WITHOUT_TIME_ZONE:
- case DATE:
- return ((Long) val1).compareTo((Long) val2);
- case BOOLEAN:
- return ((Boolean) val1).compareTo((Boolean) val2);
- case TINYINT:
- case SMALLINT:
- case INTEGER:
- return ((Integer) val1).compareTo((Integer) val2);
- case FLOAT:
- return ((Float) val1).compareTo((Float) val2);
- case DOUBLE:
- return ((Double) val1).compareTo((Double) val2);
- case BINARY:
- case VARBINARY:
- return compareBytes((byte[]) val1, (byte[]) val2);
- case CHAR:
- case VARCHAR:
- return ((String) val1).compareTo((String) val2);
- case DECIMAL:
- return ((BigDecimal) val1).compareTo((BigDecimal) val2);
- default:
- throw new UnsupportedOperationException("Unsupported type: " +
logicalType);
- }
- }
-
- private static int compareBytes(byte[] v1, byte[] v2) {
- int len1 = v1.length;
- int len2 = v2.length;
- int lim = Math.min(len1, len2);
-
- int k = 0;
- while (k < lim) {
- byte c1 = v1[k];
- byte c2 = v2[k];
- if (c1 != c2) {
- return c1 - c2;
- }
- k++;
- }
- return len1 - len2;
- }
-
- /**
- * Returns the IN expression literal values.
- */
- private static Object[] getInLiteralVals(List<Expression> childExprs) {
- List<Object> vals = new ArrayList<>();
- for (int i = 1; i < childExprs.size(); i++) {
- vals.add(ExpressionUtils.getValueFromLiteral((ValueLiteralExpression)
childExprs.get(i)));
- }
- return vals.toArray();
- }
-
- /**
- * Returns the value as Java object at position {@code pos} of row {@code
indexRow}.
- */
- private static Object getValAsJavaObj(RowData indexRow, int pos, LogicalType
colType) {
- switch (colType.getTypeRoot()) {
- // NOTE: Since we can't rely on Avro's "date", and "timestamp-micros"
logical-types, we're
- // manually encoding corresponding values as int and long w/in the
Column Stats Index and
- // here we have to decode those back into corresponding logical
representation.
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- TimestampType tsType = (TimestampType) colType;
- return indexRow.getTimestamp(pos,
tsType.getPrecision()).getMillisecond();
- case TIME_WITHOUT_TIME_ZONE:
- case DATE:
- return indexRow.getLong(pos);
- // NOTE: All integral types of size less than Int are encoded as Ints in
MT
- case BOOLEAN:
- return indexRow.getBoolean(pos);
- case TINYINT:
- case SMALLINT:
- case INTEGER:
- return indexRow.getInt(pos);
- case FLOAT:
- return indexRow.getFloat(pos);
- case DOUBLE:
- return indexRow.getDouble(pos);
- case BINARY:
- case VARBINARY:
- return indexRow.getBinary(pos);
- case CHAR:
- case VARCHAR:
- return indexRow.getString(pos).toString();
- case DECIMAL:
- DecimalType decimalType = (DecimalType) colType;
- return indexRow.getDecimal(pos, decimalType.getPrecision(),
decimalType.getScale()).toBigDecimal();
- default:
- throw new UnsupportedOperationException("Unsupported type: " +
colType);
- }
- }
-}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
new file mode 100644
index 00000000000..eef8e6df48c
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.source.DataPruner.convertColumnStats;
+import static org.apache.hudi.source.ExpressionEvaluators.fromExpression;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link
org.apache.hudi.source.ExpressionEvaluators.Evaluator}.
+ */
+public class TestExpressionEvaluators {
+ private static final DataType ROW_DATA_TYPE = DataTypes.ROW(
+ DataTypes.FIELD("f_tinyint", DataTypes.TINYINT()),
+ DataTypes.FIELD("f_smallint", DataTypes.SMALLINT()),
+ DataTypes.FIELD("f_int", DataTypes.INT()),
+ DataTypes.FIELD("f_long", DataTypes.BIGINT()),
+ DataTypes.FIELD("f_float", DataTypes.FLOAT()),
+ DataTypes.FIELD("f_double", DataTypes.DOUBLE()),
+ DataTypes.FIELD("f_boolean", DataTypes.BOOLEAN()),
+ DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(10, 2)),
+ DataTypes.FIELD("f_bytes", DataTypes.VARBINARY(10)),
+ DataTypes.FIELD("f_string", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("f_time", DataTypes.TIME(3)),
+ DataTypes.FIELD("f_date", DataTypes.DATE()),
+ DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3))
+ ).notNull();
+ private static final DataType INDEX_ROW_DATA_TYPE = DataTypes.ROW(
+ DataTypes.FIELD("file_name", DataTypes.STRING()),
+ DataTypes.FIELD("value_cnt", DataTypes.BIGINT()),
+ DataTypes.FIELD("f_int_min", DataTypes.INT()),
+ DataTypes.FIELD("f_int_max", DataTypes.INT()),
+ DataTypes.FIELD("f_int_null_cnt", DataTypes.BIGINT()),
+ DataTypes.FIELD("f_string_min", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("f_string_max", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("f_string_null_cnt", DataTypes.BIGINT()),
+ DataTypes.FIELD("f_timestamp_min", DataTypes.TIMESTAMP(3)),
+ DataTypes.FIELD("f_timestamp_max", DataTypes.TIMESTAMP(3)),
+ DataTypes.FIELD("f_timestamp_null_cnt", DataTypes.BIGINT())
+ ).notNull();
+
+ private static final RowType INDEX_ROW_TYPE = (RowType)
INDEX_ROW_DATA_TYPE.getLogicalType();
+
+ @Test
+ void testEqualTo() {
+ ExpressionEvaluators.EqualTo equalTo =
ExpressionEvaluators.EqualTo.getInstance();
+ FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
+ ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+ equalTo.bindVal(vExpr)
+ .bindFieldReference(rExpr);
+ RowData indexRow1 = intIndexRow(11, 13);
+ Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1,
queryFields(2));
+ assertTrue(equalTo.eval(stats1), "11 < 12 < 13");
+
+ RowData indexRow2 = intIndexRow(12, 13);
+ Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2,
queryFields(2));
+ assertTrue(equalTo.eval(stats2), "12 <= 12 < 13");
+
+ RowData indexRow3 = intIndexRow(11, 12);
+ Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3,
queryFields(2));
+ assertTrue(equalTo.eval(stats3), "11 < 12 <= 12");
+
+ RowData indexRow4 = intIndexRow(10, 11);
+ Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4,
queryFields(2));
+ assertFalse(equalTo.eval(stats4), "11 < 12");
+
+ RowData indexRow5 = intIndexRow(13, 14);
+ Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5,
queryFields(2));
+ assertFalse(equalTo.eval(stats5), "12 < 13");
+
+ RowData indexRow6 = intIndexRow(null, null);
+ Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6,
queryFields(2));
+ assertFalse(equalTo.eval(stats6), "12 <> null");
+
+ equalTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
+ assertFalse(equalTo.eval(stats1), "It is not possible to test for NULL
values with '=' operator");
+ }
+
+ @Test
+ void testNotEqualTo() {
+ ExpressionEvaluators.NotEqualTo notEqualTo =
ExpressionEvaluators.NotEqualTo.getInstance();
+ FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
+ ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+ RowData indexRow1 = intIndexRow(11, 13);
+ notEqualTo.bindVal(vExpr)
+ .bindFieldReference(rExpr);
+ Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1,
queryFields(2));
+ assertTrue(notEqualTo.eval(stats1), "11 <> 12 && 12 <> 13");
+
+ RowData indexRow2 = intIndexRow(12, 13);
+ Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2,
queryFields(2));
+ assertTrue(notEqualTo.eval(stats2), "12 <> 13");
+
+ RowData indexRow3 = intIndexRow(11, 12);
+ Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3,
queryFields(2));
+ assertTrue(notEqualTo.eval(stats3), "11 <> 12");
+
+ RowData indexRow4 = intIndexRow(10, 11);
+ Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4,
queryFields(2));
+ assertTrue(notEqualTo.eval(stats4), "10 <> 12 and 11 < 12");
+
+ RowData indexRow5 = intIndexRow(13, 14);
+ Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5,
queryFields(2));
+ assertTrue(notEqualTo.eval(stats5), "12 <> 13 and 12 <> 14");
+
+ RowData indexRow6 = intIndexRow(null, null);
+ Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6,
queryFields(2));
+ assertTrue(notEqualTo.eval(stats6), "12 <> null");
+
+ notEqualTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
+ assertFalse(notEqualTo.eval(stats1), "It is not possible to test for NULL
values with '<>' operator");
+ }
+
+ @Test
+ void testIsNull() {
+ ExpressionEvaluators.IsNull isNull =
ExpressionEvaluators.IsNull.getInstance();
+ FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
+
+ RowData indexRow1 = intIndexRow(11, 13);
+ isNull.bindFieldReference(rExpr);
+ Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1,
queryFields(2));
+ assertTrue(isNull.eval(stats1), "2 nulls");
+
+ RowData indexRow2 = intIndexRow(12, 13, 0L);
+ Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2,
queryFields(2));
+ assertFalse(isNull.eval(stats2), "0 nulls");
+ }
+
+ @Test
+ void testIsNotNull() {
+ ExpressionEvaluators.IsNotNull isNotNull =
ExpressionEvaluators.IsNotNull.getInstance();
+ FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
+
+ RowData indexRow1 = intIndexRow(11, 13);
+ isNotNull.bindFieldReference(rExpr);
+ Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1,
queryFields(2));
+ assertTrue(isNotNull.eval(stats1), "min 11 is not null");
+
+ RowData indexRow2 = intIndexRow(null, null, 0L);
+ Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2,
queryFields(2));
+ assertTrue(isNotNull.eval(stats2), "min is null and 0 nulls");
+ }
+
+ @Test
+ void testLessThan() {
+ ExpressionEvaluators.LessThan lessThan =
ExpressionEvaluators.LessThan.getInstance();
+ FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
+ ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+ RowData indexRow1 = intIndexRow(11, 13);
+ lessThan.bindVal(vExpr)
+ .bindFieldReference(rExpr);
+ Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1,
queryFields(2));
+ assertTrue(lessThan.eval(stats1), "12 < 13");
+
+ RowData indexRow2 = intIndexRow(12, 13);
+ Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2,
queryFields(2));
+ assertFalse(lessThan.eval(stats2), "min 12 = 12");
+
+ RowData indexRow3 = intIndexRow(11, 12);
+ Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3,
queryFields(2));
+ assertTrue(lessThan.eval(stats3), "11 < 12");
+
+ RowData indexRow4 = intIndexRow(10, 11);
+ Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4,
queryFields(2));
+ assertTrue(lessThan.eval(stats4), "11 < 12");
+
+ RowData indexRow5 = intIndexRow(13, 14);
+ Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5,
queryFields(2));
+ assertFalse(lessThan.eval(stats5), "12 < min 13");
+
+ RowData indexRow6 = intIndexRow(null, null);
+ Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6,
queryFields(2));
+ assertFalse(lessThan.eval(stats6), "12 <> null");
+
+ lessThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
+ assertFalse(lessThan.eval(stats1), "It is not possible to test for NULL
values with '<' operator");
+ }
+
+ @Test
+ void testGreaterThan() {
+ ExpressionEvaluators.GreaterThan greaterThan =
ExpressionEvaluators.GreaterThan.getInstance();
+ FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
+ ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+ RowData indexRow1 = intIndexRow(11, 13);
+ greaterThan.bindVal(vExpr)
+ .bindFieldReference(rExpr);
+ Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1,
queryFields(2));
+ assertTrue(greaterThan.eval(stats1), "12 < 13");
+
+ RowData indexRow2 = intIndexRow(12, 13);
+ Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2,
queryFields(2));
+ assertTrue(greaterThan.eval(stats2), "12 < 13");
+
+ RowData indexRow3 = intIndexRow(11, 12);
+ Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3,
queryFields(2));
+ assertFalse(greaterThan.eval(stats3), "max 12 = 12");
+
+ RowData indexRow4 = intIndexRow(10, 11);
+ Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4,
queryFields(2));
+ assertFalse(greaterThan.eval(stats4), "max 11 < 12");
+
+ RowData indexRow5 = intIndexRow(13, 14);
+ Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5,
queryFields(2));
+ assertTrue(greaterThan.eval(stats5), "12 < 13");
+
+ RowData indexRow6 = intIndexRow(null, null);
+ Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6,
queryFields(2));
+ assertFalse(greaterThan.eval(stats6), "12 <> null");
+
+ greaterThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
+ assertFalse(greaterThan.eval(stats1), "It is not possible to test for NULL
values with '>' operator");
+ }
+
+ @Test
+ void testLessThanOrEqual() {
+ ExpressionEvaluators.LessThanOrEqual lessThanOrEqual =
ExpressionEvaluators.LessThanOrEqual.getInstance();
+ FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
+ ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+ RowData indexRow1 = intIndexRow(11, 13);
+ lessThanOrEqual.bindVal(vExpr)
+ .bindFieldReference(rExpr);
+ Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1,
queryFields(2));
+ assertTrue(lessThanOrEqual.eval(stats1), "11 < 12");
+
+ RowData indexRow2 = intIndexRow(12, 13);
+ Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2,
queryFields(2));
+ assertTrue(lessThanOrEqual.eval(stats2), "min 12 = 12");
+
+ RowData indexRow3 = intIndexRow(11, 12);
+ Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3,
queryFields(2));
+ assertTrue(lessThanOrEqual.eval(stats3), "max 12 = 12");
+
+ RowData indexRow4 = intIndexRow(10, 11);
+ Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4,
queryFields(2));
+ assertTrue(lessThanOrEqual.eval(stats4), "max 11 < 12");
+
+ RowData indexRow5 = intIndexRow(13, 14);
+ Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5,
queryFields(2));
+ assertFalse(lessThanOrEqual.eval(stats5), "12 < 13");
+
+ RowData indexRow6 = intIndexRow(null, null);
+ Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6,
queryFields(2));
+ assertFalse(lessThanOrEqual.eval(stats6), "12 <> null");
+
+ lessThanOrEqual.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
+ assertFalse(lessThanOrEqual.eval(stats1), "It is not possible to test for
NULL values with '<=' operator");
+ }
+
+ @Test
+ void testGreaterThanOrEqual() {
+ ExpressionEvaluators.GreaterThanOrEqual greaterThanOrEqual =
ExpressionEvaluators.GreaterThanOrEqual.getInstance();
+ FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
+ ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
+
+ RowData indexRow1 = intIndexRow(11, 13);
+ greaterThanOrEqual.bindVal(vExpr)
+ .bindFieldReference(rExpr);
+ Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1,
queryFields(2));
+ assertTrue(greaterThanOrEqual.eval(stats1), "12 < 13");
+
+ RowData indexRow2 = intIndexRow(12, 13);
+ Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2,
queryFields(2));
+ assertTrue(greaterThanOrEqual.eval(stats2), "min 12 = 12");
+
+ RowData indexRow3 = intIndexRow(11, 12);
+ Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3,
queryFields(2));
+ assertTrue(greaterThanOrEqual.eval(stats3), "max 12 = 12");
+
+ RowData indexRow4 = intIndexRow(10, 11);
+ Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4,
queryFields(2));
+ assertFalse(greaterThanOrEqual.eval(stats4), "max 11 < 12");
+
+ RowData indexRow5 = intIndexRow(13, 14);
+ Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5,
queryFields(2));
+ assertTrue(greaterThanOrEqual.eval(stats5), "12 < 13");
+
+ RowData indexRow6 = intIndexRow(null, null);
+ Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6,
queryFields(2));
+ assertFalse(greaterThanOrEqual.eval(stats6), "12 <> null");
+
+ greaterThanOrEqual.bindVal(new ValueLiteralExpression(null,
DataTypes.INT()));
+ assertFalse(greaterThanOrEqual.eval(stats1), "It is not possible to test
for NULL values with '>=' operator");
+ }
+
+ @Test
+ void testIn() {
+ ExpressionEvaluators.In in = ExpressionEvaluators.In.getInstance();
+ FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
+
+ RowData indexRow1 = intIndexRow(11, 13);
+ in.bindFieldReference(rExpr);
+ in.bindVals(11, 12);
+ Map<String, ColumnStats> stats1 = convertColumnStats(indexRow1,
queryFields(2));
+ assertTrue(in.eval(stats1), "11 < 12 < 13");
+
+ RowData indexRow2 = intIndexRow(12, 13);
+ Map<String, ColumnStats> stats2 = convertColumnStats(indexRow2,
queryFields(2));
+ assertTrue(in.eval(stats2), "min 12 = 12");
+
+ RowData indexRow3 = intIndexRow(11, 12);
+ Map<String, ColumnStats> stats3 = convertColumnStats(indexRow3,
queryFields(2));
+ assertTrue(in.eval(stats3), "max 12 = 12");
+
+ RowData indexRow4 = intIndexRow(10, 11);
+ Map<String, ColumnStats> stats4 = convertColumnStats(indexRow4,
queryFields(2));
+ assertTrue(in.eval(stats4), "max 11 = 11");
+
+ RowData indexRow5 = intIndexRow(13, 14);
+ Map<String, ColumnStats> stats5 = convertColumnStats(indexRow5,
queryFields(2));
+ assertFalse(in.eval(stats5), "12 < 13");
+
+ RowData indexRow6 = intIndexRow(null, null);
+ Map<String, ColumnStats> stats6 = convertColumnStats(indexRow6,
queryFields(2));
+ assertFalse(in.eval(stats6), "12 <> null");
+
+ in.bindVals((Object) null);
+ assertFalse(in.eval(stats1), "It is not possible to test for NULL values
with 'in' operator");
+ }
+
+ @Test
+ void testAlwaysFalse() {
+ FieldReferenceExpression ref = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
+ ValueLiteralExpression nullLiteral = new ValueLiteralExpression(null,
DataTypes.INT());
+ RowData indexRow = intIndexRow(11, 13);
+ Map<String, ColumnStats> stats = convertColumnStats(indexRow,
queryFields(2));
+ FunctionDefinition[] funDefs = new FunctionDefinition[] {
+ BuiltInFunctionDefinitions.EQUALS,
+ BuiltInFunctionDefinitions.NOT_EQUALS,
+ BuiltInFunctionDefinitions.LESS_THAN,
+ BuiltInFunctionDefinitions.GREATER_THAN,
+ BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+ BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+ BuiltInFunctionDefinitions.IN};
+ for (FunctionDefinition funDef : funDefs) {
+ CallExpression expr =
+ new CallExpression(
+ funDef,
+ Arrays.asList(ref, nullLiteral),
+ DataTypes.BOOLEAN());
+ // always return false if the literal value is null
+ assertFalse(fromExpression(expr).eval(stats));
+ }
+ }
+
+ private static RowData intIndexRow(Integer minVal, Integer maxVal) {
+ return intIndexRow(minVal, maxVal, 2L);
+ }
+
+ private static RowData intIndexRow(Integer minVal, Integer maxVal, Long
nullCnt) {
+ return indexRow(StringData.fromString("f1"), 100L,
+ minVal, maxVal, nullCnt,
+ StringData.fromString("1"), StringData.fromString("100"), 5L,
+ TimestampData.fromEpochMillis(1), TimestampData.fromEpochMillis(100),
3L);
+ }
+
+ private static RowData indexRow(Object... fields) {
+ return TestData.insertRow(INDEX_ROW_TYPE, fields);
+ }
+
+ private static RowType.RowField[] queryFields(int... pos) {
+ List<RowType.RowField> fields = ((RowType)
ROW_DATA_TYPE.getLogicalType()).getFields();
+ return
Arrays.stream(pos).mapToObj(fields::get).toArray(RowType.RowField[]::new);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
deleted file mode 100644
index e1fe27c3bb5..00000000000
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.source.stats;
-
-import org.apache.hudi.utils.TestData;
-
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.expressions.CallExpression;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ValueLiteralExpression;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static
org.apache.hudi.source.stats.ExpressionEvaluator.Evaluator.bindCall;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Test cases for {@link ExpressionEvaluator}.
- */
-public class TestExpressionEvaluator {
- private static final DataType ROW_DATA_TYPE = DataTypes.ROW(
- DataTypes.FIELD("f_tinyint", DataTypes.TINYINT()),
- DataTypes.FIELD("f_smallint", DataTypes.SMALLINT()),
- DataTypes.FIELD("f_int", DataTypes.INT()),
- DataTypes.FIELD("f_long", DataTypes.BIGINT()),
- DataTypes.FIELD("f_float", DataTypes.FLOAT()),
- DataTypes.FIELD("f_double", DataTypes.DOUBLE()),
- DataTypes.FIELD("f_boolean", DataTypes.BOOLEAN()),
- DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(10, 2)),
- DataTypes.FIELD("f_bytes", DataTypes.VARBINARY(10)),
- DataTypes.FIELD("f_string", DataTypes.VARCHAR(10)),
- DataTypes.FIELD("f_time", DataTypes.TIME(3)),
- DataTypes.FIELD("f_date", DataTypes.DATE()),
- DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3))
- ).notNull();
- private static final DataType INDEX_ROW_DATA_TYPE = DataTypes.ROW(
- DataTypes.FIELD("file_name", DataTypes.STRING()),
- DataTypes.FIELD("value_cnt", DataTypes.BIGINT()),
- DataTypes.FIELD("f_int_min", DataTypes.INT()),
- DataTypes.FIELD("f_int_max", DataTypes.INT()),
- DataTypes.FIELD("f_int_null_cnt", DataTypes.BIGINT()),
- DataTypes.FIELD("f_string_min", DataTypes.VARCHAR(10)),
- DataTypes.FIELD("f_string_max", DataTypes.VARCHAR(10)),
- DataTypes.FIELD("f_string_null_cnt", DataTypes.BIGINT()),
- DataTypes.FIELD("f_timestamp_min", DataTypes.TIMESTAMP(3)),
- DataTypes.FIELD("f_timestamp_max", DataTypes.TIMESTAMP(3)),
- DataTypes.FIELD("f_timestamp_null_cnt", DataTypes.BIGINT())
- ).notNull();
-
- private static final RowType INDEX_ROW_TYPE = (RowType)
INDEX_ROW_DATA_TYPE.getLogicalType();
-
- @Test
- void testEqualTo() {
- ExpressionEvaluator.EqualTo equalTo =
ExpressionEvaluator.EqualTo.getInstance();
- FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
- ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
- RowData indexRow1 = intIndexRow(11, 13);
- equalTo.bindFieldReference(rExpr)
- .bindVal(vExpr)
- .bindColStats(indexRow1, queryFields(2), rExpr);
- assertTrue(equalTo.eval(), "11 < 12 < 13");
-
- RowData indexRow2 = intIndexRow(12, 13);
- equalTo.bindColStats(indexRow2, queryFields(2), rExpr);
- assertTrue(equalTo.eval(), "12 <= 12 < 13");
-
- RowData indexRow3 = intIndexRow(11, 12);
- equalTo.bindColStats(indexRow3, queryFields(2), rExpr);
- assertTrue(equalTo.eval(), "11 < 12 <= 12");
-
- RowData indexRow4 = intIndexRow(10, 11);
- equalTo.bindColStats(indexRow4, queryFields(2), rExpr);
- assertFalse(equalTo.eval(), "11 < 12");
-
- RowData indexRow5 = intIndexRow(13, 14);
- equalTo.bindColStats(indexRow5, queryFields(2), rExpr);
- assertFalse(equalTo.eval(), "12 < 13");
-
- RowData indexRow6 = intIndexRow(null, null);
- equalTo.bindColStats(indexRow6, queryFields(2), rExpr);
- assertFalse(equalTo.eval(), "12 <> null");
-
- equalTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
- assertFalse(equalTo.eval(), "It is not possible to test for NULL values
with '=' operator");
- }
-
- @Test
- void testNotEqualTo() {
- ExpressionEvaluator.NotEqualTo notEqualTo =
ExpressionEvaluator.NotEqualTo.getInstance();
- FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
- ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
- RowData indexRow1 = intIndexRow(11, 13);
- notEqualTo.bindFieldReference(rExpr)
- .bindVal(vExpr)
- .bindColStats(indexRow1, queryFields(2), rExpr);
- assertTrue(notEqualTo.eval(), "11 <> 12 && 12 <> 13");
-
- RowData indexRow2 = intIndexRow(12, 13);
- notEqualTo.bindColStats(indexRow2, queryFields(2), rExpr);
- assertTrue(notEqualTo.eval(), "12 <> 13");
-
- RowData indexRow3 = intIndexRow(11, 12);
- notEqualTo.bindColStats(indexRow3, queryFields(2), rExpr);
- assertTrue(notEqualTo.eval(), "11 <> 12");
-
- RowData indexRow4 = intIndexRow(10, 11);
- notEqualTo.bindColStats(indexRow4, queryFields(2), rExpr);
- assertTrue(notEqualTo.eval(), "10 <> 12 and 11 < 12");
-
- RowData indexRow5 = intIndexRow(13, 14);
- notEqualTo.bindColStats(indexRow5, queryFields(2), rExpr);
- assertTrue(notEqualTo.eval(), "12 <> 13 and 12 <> 14");
-
- RowData indexRow6 = intIndexRow(null, null);
- notEqualTo.bindColStats(indexRow6, queryFields(2), rExpr);
- assertTrue(notEqualTo.eval(), "12 <> null");
-
- notEqualTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
- assertFalse(notEqualTo.eval(), "It is not possible to test for NULL values
with '<>' operator");
- }
-
- @Test
- void testIsNull() {
- ExpressionEvaluator.IsNull isNull =
ExpressionEvaluator.IsNull.getInstance();
- FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
-
- RowData indexRow1 = intIndexRow(11, 13);
- isNull.bindFieldReference(rExpr)
- .bindColStats(indexRow1, queryFields(2), rExpr);
- assertTrue(isNull.eval(), "2 nulls");
-
- RowData indexRow2 = intIndexRow(12, 13, 0L);
- isNull.bindColStats(indexRow2, queryFields(2), rExpr);
- assertFalse(isNull.eval(), "0 nulls");
- }
-
- @Test
- void testIsNotNull() {
- ExpressionEvaluator.IsNotNull isNotNull =
ExpressionEvaluator.IsNotNull.getInstance();
- FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
-
- RowData indexRow1 = intIndexRow(11, 13);
- isNotNull.bindFieldReference(rExpr)
- .bindColStats(indexRow1, queryFields(2), rExpr);
- assertTrue(isNotNull.eval(), "min 11 is not null");
-
- RowData indexRow2 = intIndexRow(null, null, 0L);
- isNotNull.bindColStats(indexRow2, queryFields(2), rExpr);
- assertTrue(isNotNull.eval(), "min is null and 0 nulls");
- }
-
- @Test
- void testLessThan() {
- ExpressionEvaluator.LessThan lessThan =
ExpressionEvaluator.LessThan.getInstance();
- FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
- ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
- RowData indexRow1 = intIndexRow(11, 13);
- lessThan.bindFieldReference(rExpr)
- .bindVal(vExpr)
- .bindColStats(indexRow1, queryFields(2), rExpr);
- assertTrue(lessThan.eval(), "12 < 13");
-
- RowData indexRow2 = intIndexRow(12, 13);
- lessThan.bindColStats(indexRow2, queryFields(2), rExpr);
- assertFalse(lessThan.eval(), "min 12 = 12");
-
- RowData indexRow3 = intIndexRow(11, 12);
- lessThan.bindColStats(indexRow3, queryFields(2), rExpr);
- assertTrue(lessThan.eval(), "11 < 12");
-
- RowData indexRow4 = intIndexRow(10, 11);
- lessThan.bindColStats(indexRow4, queryFields(2), rExpr);
- assertTrue(lessThan.eval(), "11 < 12");
-
- RowData indexRow5 = intIndexRow(13, 14);
- lessThan.bindColStats(indexRow5, queryFields(2), rExpr);
- assertFalse(lessThan.eval(), "12 < min 13");
-
- RowData indexRow6 = intIndexRow(null, null);
- lessThan.bindColStats(indexRow6, queryFields(2), rExpr);
- assertFalse(lessThan.eval(), "12 <> null");
-
- lessThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
- assertFalse(lessThan.eval(), "It is not possible to test for NULL values
with '<' operator");
- }
-
- @Test
- void testGreaterThan() {
- ExpressionEvaluator.GreaterThan greaterThan =
ExpressionEvaluator.GreaterThan.getInstance();
- FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
- ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
- RowData indexRow1 = intIndexRow(11, 13);
- greaterThan.bindFieldReference(rExpr)
- .bindVal(vExpr)
- .bindColStats(indexRow1, queryFields(2), rExpr);
- assertTrue(greaterThan.eval(), "12 < 13");
-
- RowData indexRow2 = intIndexRow(12, 13);
- greaterThan.bindColStats(indexRow2, queryFields(2), rExpr);
- assertTrue(greaterThan.eval(), "12 < 13");
-
- RowData indexRow3 = intIndexRow(11, 12);
- greaterThan.bindColStats(indexRow3, queryFields(2), rExpr);
- assertFalse(greaterThan.eval(), "max 12 = 12");
-
- RowData indexRow4 = intIndexRow(10, 11);
- greaterThan.bindColStats(indexRow4, queryFields(2), rExpr);
- assertFalse(greaterThan.eval(), "max 11 < 12");
-
- RowData indexRow5 = intIndexRow(13, 14);
- greaterThan.bindColStats(indexRow5, queryFields(2), rExpr);
- assertTrue(greaterThan.eval(), "12 < 13");
-
- RowData indexRow6 = intIndexRow(null, null);
- greaterThan.bindColStats(indexRow6, queryFields(2), rExpr);
- assertFalse(greaterThan.eval(), "12 <> null");
-
- greaterThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
- assertFalse(greaterThan.eval(), "It is not possible to test for NULL
values with '>' operator");
- }
-
- @Test
- void testLessThanOrEqual() {
- ExpressionEvaluator.LessThanOrEqual lessThanOrEqual =
ExpressionEvaluator.LessThanOrEqual.getInstance();
- FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
- ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
- RowData indexRow1 = intIndexRow(11, 13);
- lessThanOrEqual.bindFieldReference(rExpr)
- .bindVal(vExpr)
- .bindColStats(indexRow1, queryFields(2), rExpr);
- assertTrue(lessThanOrEqual.eval(), "11 < 12");
-
- RowData indexRow2 = intIndexRow(12, 13);
- lessThanOrEqual.bindColStats(indexRow2, queryFields(2), rExpr);
- assertTrue(lessThanOrEqual.eval(), "min 12 = 12");
-
- RowData indexRow3 = intIndexRow(11, 12);
- lessThanOrEqual.bindColStats(indexRow3, queryFields(2), rExpr);
- assertTrue(lessThanOrEqual.eval(), "max 12 = 12");
-
- RowData indexRow4 = intIndexRow(10, 11);
- lessThanOrEqual.bindColStats(indexRow4, queryFields(2), rExpr);
- assertTrue(lessThanOrEqual.eval(), "max 11 < 12");
-
- RowData indexRow5 = intIndexRow(13, 14);
- lessThanOrEqual.bindColStats(indexRow5, queryFields(2), rExpr);
- assertFalse(lessThanOrEqual.eval(), "12 < 13");
-
- RowData indexRow6 = intIndexRow(null, null);
- lessThanOrEqual.bindColStats(indexRow6, queryFields(2), rExpr);
- assertFalse(lessThanOrEqual.eval(), "12 <> null");
-
- lessThanOrEqual.bindVal(new ValueLiteralExpression(null, DataTypes.INT()));
- assertFalse(lessThanOrEqual.eval(), "It is not possible to test for NULL
values with '<=' operator");
- }
-
- @Test
- void testGreaterThanOrEqual() {
- ExpressionEvaluator.GreaterThanOrEqual greaterThanOrEqual =
ExpressionEvaluator.GreaterThanOrEqual.getInstance();
- FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
- ValueLiteralExpression vExpr = new ValueLiteralExpression(12);
-
- RowData indexRow1 = intIndexRow(11, 13);
- greaterThanOrEqual.bindFieldReference(rExpr)
- .bindVal(vExpr)
- .bindColStats(indexRow1, queryFields(2), rExpr);
- assertTrue(greaterThanOrEqual.eval(), "12 < 13");
-
- RowData indexRow2 = intIndexRow(12, 13);
- greaterThanOrEqual.bindColStats(indexRow2, queryFields(2), rExpr);
- assertTrue(greaterThanOrEqual.eval(), "min 12 = 12");
-
- RowData indexRow3 = intIndexRow(11, 12);
- greaterThanOrEqual.bindColStats(indexRow3, queryFields(2), rExpr);
- assertTrue(greaterThanOrEqual.eval(), "max 12 = 12");
-
- RowData indexRow4 = intIndexRow(10, 11);
- greaterThanOrEqual.bindColStats(indexRow4, queryFields(2), rExpr);
- assertFalse(greaterThanOrEqual.eval(), "max 11 < 12");
-
- RowData indexRow5 = intIndexRow(13, 14);
- greaterThanOrEqual.bindColStats(indexRow5, queryFields(2), rExpr);
- assertTrue(greaterThanOrEqual.eval(), "12 < 13");
-
- RowData indexRow6 = intIndexRow(null, null);
- greaterThanOrEqual.bindColStats(indexRow6, queryFields(2), rExpr);
- assertFalse(greaterThanOrEqual.eval(), "12 <> null");
-
- greaterThanOrEqual.bindVal(new ValueLiteralExpression(null,
DataTypes.INT()));
- assertFalse(greaterThanOrEqual.eval(), "It is not possible to test for
NULL values with '>=' operator");
- }
-
- @Test
- void testIn() {
- ExpressionEvaluator.In in = ExpressionEvaluator.In.getInstance();
- FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
-
- RowData indexRow1 = intIndexRow(11, 13);
- in.bindFieldReference(rExpr)
- .bindColStats(indexRow1, queryFields(2), rExpr);
- in.bindVals(12);
- assertTrue(in.eval(), "11 < 12 < 13");
-
- RowData indexRow2 = intIndexRow(12, 13);
- in.bindColStats(indexRow2, queryFields(2), rExpr);
- assertTrue(in.eval(), "min 12 = 12");
-
- RowData indexRow3 = intIndexRow(11, 12);
- in.bindColStats(indexRow3, queryFields(2), rExpr);
- assertTrue(in.eval(), "max 12 = 12");
-
- RowData indexRow4 = intIndexRow(10, 11);
- in.bindColStats(indexRow4, queryFields(2), rExpr);
- assertFalse(in.eval(), "max 11 < 12");
-
- RowData indexRow5 = intIndexRow(13, 14);
- in.bindColStats(indexRow5, queryFields(2), rExpr);
- assertFalse(in.eval(), "12 < 13");
-
- RowData indexRow6 = intIndexRow(null, null);
- in.bindColStats(indexRow6, queryFields(2), rExpr);
- assertFalse(in.eval(), "12 <> null");
-
- in.bindVals((Object) null);
- assertFalse(in.eval(), "It is not possible to test for NULL values with
'in' operator");
- }
-
- @Test
- void testAlwaysFalse() {
- FieldReferenceExpression ref = new FieldReferenceExpression("f_int",
DataTypes.INT(), 2, 2);
- ValueLiteralExpression nullLiteral = new ValueLiteralExpression(null,
DataTypes.INT());
- RowData indexRow = intIndexRow(11, 13);
- RowType.RowField[] queryFields = queryFields(2);
- FunctionDefinition[] funDefs = new FunctionDefinition[] {
- BuiltInFunctionDefinitions.EQUALS,
- BuiltInFunctionDefinitions.NOT_EQUALS,
- BuiltInFunctionDefinitions.LESS_THAN,
- BuiltInFunctionDefinitions.GREATER_THAN,
- BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
- BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
- BuiltInFunctionDefinitions.IN};
- for (FunctionDefinition funDef : funDefs) {
- CallExpression expr =
- new CallExpression(
- funDef,
- Arrays.asList(ref, nullLiteral),
- DataTypes.BOOLEAN());
- // always return false if the literal value is null
- assertFalse(bindCall(expr, indexRow, queryFields).eval());
- }
- }
-
- private static RowData intIndexRow(Integer minVal, Integer maxVal) {
- return intIndexRow(minVal, maxVal, 2L);
- }
-
- private static RowData intIndexRow(Integer minVal, Integer maxVal, Long
nullCnt) {
- return indexRow(StringData.fromString("f1"), 100L,
- minVal, maxVal, nullCnt,
- StringData.fromString("1"), StringData.fromString("100"), 5L,
- TimestampData.fromEpochMillis(1), TimestampData.fromEpochMillis(100),
3L);
- }
-
- private static RowData indexRow(Object... fields) {
- return TestData.insertRow(INDEX_ROW_TYPE, fields);
- }
-
- private static RowType.RowField[] queryFields(int... pos) {
- List<RowType.RowField> fields = ((RowType)
ROW_DATA_TYPE.getLogicalType()).getFields();
- return
Arrays.stream(pos).mapToObj(fields::get).toArray(RowType.RowField[]::new);
- }
-}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 5aba8885a64..4c34a318bfd 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1583,6 +1583,13 @@ public class ITTestHoodieDataSource {
+ "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
+ "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
+ // filter by in expression
+ List<Row> result4 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1 where uuid in ('id6', 'id7',
'id8')").execute().collect());
+ assertRowsEquals(result4, "["
+ + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
+ + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+ + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}
@Test