This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 5be4b8ebcf17700fc372fe0a69c4007ee932cf6b
Author: Jane Chan <[email protected]>
AuthorDate: Mon Mar 28 14:49:12 2022 +0800

    [FLINK-26877] Introduce auto type inference for filter push down
    
    This closes #64
---
 .../flink/table/store/file/predicate/And.java      |  19 +++
 .../flink/table/store/file/predicate/Equal.java    |  19 +++
 .../table/store/file/predicate/GreaterOrEqual.java |  19 +++
 .../table/store/file/predicate/GreaterThan.java    |  19 +++
 .../table/store/file/predicate/IsNotNull.java      |  19 +++
 .../flink/table/store/file/predicate/IsNull.java   |  19 +++
 .../table/store/file/predicate/LessOrEqual.java    |  19 +++
 .../flink/table/store/file/predicate/LessThan.java |  19 +++
 .../flink/table/store/file/predicate/Literal.java  |  18 +++
 .../flink/table/store/file/predicate/NotEqual.java |  19 +++
 .../flink/table/store/file/predicate/Or.java       |  19 +++
 .../store/file/predicate/PredicateConverter.java   |  64 +++++----
 .../file/predicate/PredicateConverterTest.java     | 148 +++++++++++++++++++++
 13 files changed, 396 insertions(+), 24 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
index 5c2817a..8744df9 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
 
+import java.util.Objects;
+
 /** A {@link Predicate} to eval and. */
 public class And implements Predicate {
 
@@ -42,4 +44,21 @@ public class And implements Predicate {
     public boolean test(long rowCount, FieldStats[] fieldStats) {
         return predicate1.test(rowCount, fieldStats) && 
predicate2.test(rowCount, fieldStats);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof And)) {
+            return false;
+        }
+        And and = (And) o;
+        return predicate1.equals(and.predicate1) && 
predicate2.equals(and.predicate2);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(predicate1, predicate2);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
index 56fc91d..489fab0 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
 
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A {@link Predicate} to eval equal. */
@@ -51,4 +53,21 @@ public class Equal implements Predicate {
         return literal.compareValueTo(stats.minValue()) >= 0
                 && literal.compareValueTo(stats.maxValue()) <= 0;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof Equal)) {
+            return false;
+        }
+        Equal equal = (Equal) o;
+        return index == equal.index && literal.equals(equal.literal);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index, literal);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
index 79a0cdc..4812476 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
 
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A {@link Predicate} to eval greater or equal. */
@@ -50,4 +52,21 @@ public class GreaterOrEqual implements Predicate {
         }
         return literal.compareValueTo(stats.maxValue()) <= 0;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof GreaterOrEqual)) {
+            return false;
+        }
+        GreaterOrEqual that = (GreaterOrEqual) o;
+        return index == that.index && literal.equals(that.literal);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index, literal);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
index 81ddc26..535cbc5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
 
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A {@link Predicate} to eval greater. */
@@ -50,4 +52,21 @@ public class GreaterThan implements Predicate {
         }
         return literal.compareValueTo(stats.maxValue()) < 0;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof GreaterThan)) {
+            return false;
+        }
+        GreaterThan that = (GreaterThan) o;
+        return index == that.index && literal.equals(that.literal);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index, literal);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
index 9de7d5b..3d53724 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
 
+import java.util.Objects;
+
 /** A {@link Predicate} to eval is not null. */
 public class IsNotNull implements Predicate {
 
@@ -40,4 +42,21 @@ public class IsNotNull implements Predicate {
     public boolean test(long rowCount, FieldStats[] fieldStats) {
         return fieldStats[index].nullCount() < rowCount;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IsNotNull)) {
+            return false;
+        }
+        IsNotNull isNotNull = (IsNotNull) o;
+        return index == isNotNull.index;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
index 5e9461a..b863682 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
 
+import java.util.Objects;
+
 /** A {@link Predicate} to eval is null. */
 public class IsNull implements Predicate {
 
@@ -40,4 +42,21 @@ public class IsNull implements Predicate {
     public boolean test(long rowCount, FieldStats[] fieldStats) {
         return fieldStats[index].nullCount() > 0;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IsNull)) {
+            return false;
+        }
+        IsNull isNull = (IsNull) o;
+        return index == isNull.index;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
index 9a94190..98e24d5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
 
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A {@link Predicate} to eval less or equal. */
@@ -50,4 +52,21 @@ public class LessOrEqual implements Predicate {
         }
         return literal.compareValueTo(stats.minValue()) >= 0;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof LessOrEqual)) {
+            return false;
+        }
+        LessOrEqual that = (LessOrEqual) o;
+        return index == that.index && literal.equals(that.literal);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index, literal);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
index f8fda69..b950dd7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
 
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A {@link Predicate} to eval less. */
@@ -50,4 +52,21 @@ public class LessThan implements Predicate {
         }
         return literal.compareValueTo(stats.minValue()) > 0;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof LessThan)) {
+            return false;
+        }
+        LessThan lessThan = (LessThan) o;
+        return index == lessThan.index && literal.equals(lessThan.literal);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index, literal);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
index eed1b82..686107e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.Objects;
 
 /** A serializable literal class. */
 public class Literal implements Serializable {
@@ -73,4 +74,21 @@ public class Literal implements Serializable {
         in.defaultReadObject();
         value = InternalSerializers.create(type).deserialize(new 
DataInputViewStreamWrapper(in));
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof Literal)) {
+            return false;
+        }
+        Literal literal = (Literal) o;
+        return type.equals(literal.type) && value.equals(literal.value);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, value);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
index 2c2136a..4d6a924 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
 
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A {@link Predicate} to eval not equal. */
@@ -48,4 +50,21 @@ public class NotEqual implements Predicate {
         return literal.compareValueTo(stats.minValue()) != 0
                 || literal.compareValueTo(stats.maxValue()) != 0;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof NotEqual)) {
+            return false;
+        }
+        NotEqual notEqual = (NotEqual) o;
+        return index == notEqual.index && literal.equals(notEqual.literal);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index, literal);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
index 02a79b1..5a5f3d2 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
 
 import org.apache.flink.table.store.file.stats.FieldStats;
 
+import java.util.Objects;
+
 /** A {@link Predicate} to eval or. */
 public class Or implements Predicate {
 
@@ -42,4 +44,21 @@ public class Or implements Predicate {
     public boolean test(long rowCount, FieldStats[] fieldStats) {
         return predicate1.test(rowCount, fieldStats) || 
predicate2.test(rowCount, fieldStats);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof Or)) {
+            return false;
+        }
+        Or or = (Or) o;
+        return predicate1.equals(or.predicate1) && 
predicate2.equals(or.predicate2);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(predicate1, predicate2);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index e625552..f845dec 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -40,6 +40,7 @@ import java.util.Optional;
 import java.util.function.BiFunction;
 
 import static 
org.apache.flink.table.data.conversion.DataStructureConverters.getConverter;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
 
 /** Convert {@link Expression} to {@link Predicate}. */
 public class PredicateConverter implements ExpressionVisitor<Predicate> {
@@ -86,10 +87,12 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
             return visitBiFunction(children, LessOrEqual::new, 
GreaterOrEqual::new);
         } else if (func == BuiltInFunctionDefinitions.IS_NULL) {
             return extractFieldReference(children.get(0))
+                    .map(FieldReferenceExpression::getFieldIndex)
                     .map(IsNull::new)
                     .orElseThrow(UnsupportedExpression::new);
         } else if (func == BuiltInFunctionDefinitions.IS_NOT_NULL) {
             return extractFieldReference(children.get(0))
+                    .map(FieldReferenceExpression::getFieldIndex)
                     .map(IsNotNull::new)
                     .orElseThrow(UnsupportedExpression::new);
         }
@@ -103,19 +106,19 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
             List<Expression> children,
             BiFunction<Integer, Literal, Predicate> visit1,
             BiFunction<Integer, Literal, Predicate> visit2) {
-        Optional<Integer> field = extractFieldReference(children.get(0));
+        Optional<FieldReferenceExpression> fieldRefExpr = 
extractFieldReference(children.get(0));
         Optional<Literal> literal;
-        if (field.isPresent()) {
-            literal = extractLiteral(children.get(1));
+        if (fieldRefExpr.isPresent()) {
+            literal = extractLiteral(fieldRefExpr.get().getOutputDataType(), 
children.get(1));
             if (literal.isPresent()) {
-                return visit1.apply(field.get(), literal.get());
+                return visit1.apply(fieldRefExpr.get().getFieldIndex(), 
literal.get());
             }
         } else {
-            field = extractFieldReference(children.get(1));
-            if (field.isPresent()) {
-                literal = extractLiteral(children.get(0));
+            fieldRefExpr = extractFieldReference(children.get(1));
+            if (fieldRefExpr.isPresent()) {
+                literal = 
extractLiteral(fieldRefExpr.get().getOutputDataType(), children.get(0));
                 if (literal.isPresent()) {
-                    return visit2.apply(field.get(), literal.get());
+                    return visit2.apply(fieldRefExpr.get().getFieldIndex(), 
literal.get());
                 }
             }
         }
@@ -123,30 +126,43 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
         throw new UnsupportedExpression();
     }
 
-    private Optional<Integer> extractFieldReference(Expression expression) {
+    private Optional<FieldReferenceExpression> 
extractFieldReference(Expression expression) {
         if (expression instanceof FieldReferenceExpression) {
-            int reference = ((FieldReferenceExpression) 
expression).getFieldIndex();
-            return Optional.of(reference);
+            return Optional.of((FieldReferenceExpression) expression);
         }
         return Optional.empty();
     }
 
-    private Optional<Literal> extractLiteral(Expression expression) {
+    private Optional<Literal> extractLiteral(DataType expectedType, Expression 
expression) {
+        LogicalType expectedLogicalType = expectedType.getLogicalType();
+        if (!supportsPredicate(expectedLogicalType)) {
+            return Optional.empty();
+        }
+        Literal literal = null;
         if (expression instanceof ValueLiteralExpression) {
             ValueLiteralExpression valueExpression = (ValueLiteralExpression) 
expression;
-            DataType type = valueExpression.getOutputDataType();
-            return supportsPredicate(type.getLogicalType())
-                    ? Optional.of(
-                            new Literal(
-                                    type.getLogicalType(),
-                                    getConverter(type)
-                                            .toInternalOrNull(
-                                                    valueExpression
-                                                            
.getValueAs(type.getConversionClass())
-                                                            .get())))
-                    : Optional.empty();
+            DataType actualType = valueExpression.getOutputDataType();
+            LogicalType actualLogicalType = actualType.getLogicalType();
+            Optional<?> valueOpt = 
valueExpression.getValueAs(actualType.getConversionClass());
+            if (!valueOpt.isPresent()) {
+                return Optional.empty();
+            }
+            Object value = valueOpt.get();
+            if 
(actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())) {
+                literal =
+                        new Literal(
+                                expectedLogicalType,
+                                
getConverter(expectedType).toInternalOrNull(value));
+            } else if (supportsImplicitCast(actualLogicalType, 
expectedLogicalType)) {
+                try {
+                    value = TypeUtils.castFromString(value.toString(), 
expectedLogicalType);
+                    literal = new Literal(expectedLogicalType, value);
+                } catch (Exception ignored) {
+                    // ignore here, let #visit throw UnsupportedExpression
+                }
+            }
         }
-        return Optional.empty();
+        return literal == null ? Optional.empty() : Optional.of(literal);
     }
 
     private boolean supportsPredicate(LogicalType type) {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
new file mode 100644
index 0000000..8cd5717
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link PredicateConverter}. */
+public class PredicateConverterTest {
+
+    private static final PredicateConverter CONVERTER = new 
PredicateConverter();
+
+    @MethodSource("provideResolvedExpression")
+    @ParameterizedTest
+    public void testVisitAndAutoTypeInference(ResolvedExpression expression, 
Predicate expected) {
+        if (expression instanceof CallExpression) {
+            assertThat(CONVERTER.visit((CallExpression) 
expression)).isEqualTo(expected);
+        } else {
+            assertThatThrownBy(() -> CONVERTER.visit(expression))
+                    .isInstanceOf(RuntimeException.class)
+                    .hasMessageContaining("Unsupported expression");
+        }
+    }
+
+    public static Stream<Arguments> provideResolvedExpression() {
+        FieldReferenceExpression longRefExpr =
+                new FieldReferenceExpression("long1", DataTypes.BIGINT(), 0, 
0);
+        ValueLiteralExpression intLitExpr = new ValueLiteralExpression(10);
+        Literal longLit = new Literal(DataTypes.BIGINT().getLogicalType(), 
10L);
+
+        FieldReferenceExpression doubleRefExpr =
+                new FieldReferenceExpression("double1", DataTypes.DOUBLE(), 0, 
1);
+        ValueLiteralExpression floatLitExpr = new 
ValueLiteralExpression(3.14f);
+        Literal doubleLit = new Literal(DataTypes.DOUBLE().getLogicalType(), 
3.14d);
+
+        return Stream.of(
+                Arguments.of(longRefExpr, null),
+                Arguments.of(intLitExpr, null),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.IS_NULL,
+                                Collections.singletonList(longRefExpr),
+                                DataTypes.BOOLEAN()),
+                        new IsNull(0)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.IS_NOT_NULL,
+                                Collections.singletonList(doubleRefExpr),
+                                DataTypes.BOOLEAN()),
+                        new IsNotNull(1)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                // test literal on left
+                                Arrays.asList(intLitExpr, longRefExpr),
+                                DataTypes.BOOLEAN()),
+                        new Equal(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.NOT_EQUALS,
+                                Arrays.asList(longRefExpr, intLitExpr),
+                                DataTypes.BOOLEAN()),
+                        new NotEqual(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.GREATER_THAN,
+                                Arrays.asList(longRefExpr, intLitExpr),
+                                DataTypes.BOOLEAN()),
+                        new GreaterThan(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                
BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+                                Arrays.asList(longRefExpr, intLitExpr),
+                                DataTypes.BOOLEAN()),
+                        new GreaterOrEqual(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.LESS_THAN,
+                                Arrays.asList(longRefExpr, intLitExpr),
+                                DataTypes.BOOLEAN()),
+                        new LessThan(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+                                Arrays.asList(longRefExpr, intLitExpr),
+                                DataTypes.BOOLEAN()),
+                        new LessOrEqual(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.AND,
+                                Arrays.asList(
+                                        CallExpression.permanent(
+                                                
BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+                                                Arrays.asList(longRefExpr, 
intLitExpr),
+                                                DataTypes.BOOLEAN()),
+                                        CallExpression.permanent(
+                                                
BuiltInFunctionDefinitions.EQUALS,
+                                                Arrays.asList(doubleRefExpr, 
floatLitExpr),
+                                                DataTypes.BOOLEAN())),
+                                DataTypes.BOOLEAN()),
+                        new And(new LessOrEqual(0, longLit), new Equal(1, 
doubleLit))),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.OR,
+                                Arrays.asList(
+                                        CallExpression.permanent(
+                                                
BuiltInFunctionDefinitions.NOT_EQUALS,
+                                                Arrays.asList(longRefExpr, 
intLitExpr),
+                                                DataTypes.BOOLEAN()),
+                                        CallExpression.permanent(
+                                                
BuiltInFunctionDefinitions.EQUALS,
+                                                Arrays.asList(doubleRefExpr, 
floatLitExpr),
+                                                DataTypes.BOOLEAN())),
+                                DataTypes.BOOLEAN()),
+                        new Or(new NotEqual(0, longLit), new Equal(1, 
doubleLit))));
+    }
+}

Reply via email to