This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 61dff14 [FLINK-26877] Introduce auto type inference for filter push
down
61dff14 is described below
commit 61dff144a4a029da25bdbfa06017cbfc3b5d2cb5
Author: Jane Chan <[email protected]>
AuthorDate: Mon Mar 28 14:49:12 2022 +0800
[FLINK-26877] Introduce auto type inference for filter push down
This closes #64
---
.../flink/table/store/file/predicate/And.java | 19 +++
.../flink/table/store/file/predicate/Equal.java | 19 +++
.../table/store/file/predicate/GreaterOrEqual.java | 19 +++
.../table/store/file/predicate/GreaterThan.java | 19 +++
.../table/store/file/predicate/IsNotNull.java | 19 +++
.../flink/table/store/file/predicate/IsNull.java | 19 +++
.../table/store/file/predicate/LessOrEqual.java | 19 +++
.../flink/table/store/file/predicate/LessThan.java | 19 +++
.../flink/table/store/file/predicate/Literal.java | 18 +++
.../flink/table/store/file/predicate/NotEqual.java | 19 +++
.../flink/table/store/file/predicate/Or.java | 19 +++
.../store/file/predicate/PredicateConverter.java | 64 +++++----
.../file/predicate/PredicateConverterTest.java | 148 +++++++++++++++++++++
13 files changed, 396 insertions(+), 24 deletions(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
index 5c2817a..8744df9 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import java.util.Objects;
+
/** A {@link Predicate} to eval and. */
public class And implements Predicate {
@@ -42,4 +44,21 @@ public class And implements Predicate {
public boolean test(long rowCount, FieldStats[] fieldStats) {
return predicate1.test(rowCount, fieldStats) &&
predicate2.test(rowCount, fieldStats);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof And)) {
+ return false;
+ }
+ And and = (And) o;
+ return predicate1.equals(and.predicate1) &&
predicate2.equals(and.predicate2);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(predicate1, predicate2);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
index 56fc91d..489fab0 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import java.util.Objects;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A {@link Predicate} to eval equal. */
@@ -51,4 +53,21 @@ public class Equal implements Predicate {
return literal.compareValueTo(stats.minValue()) >= 0
&& literal.compareValueTo(stats.maxValue()) <= 0;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Equal)) {
+ return false;
+ }
+ Equal equal = (Equal) o;
+ return index == equal.index && literal.equals(equal.literal);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index, literal);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
index 79a0cdc..4812476 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import java.util.Objects;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A {@link Predicate} to eval greater or equal. */
@@ -50,4 +52,21 @@ public class GreaterOrEqual implements Predicate {
}
return literal.compareValueTo(stats.maxValue()) <= 0;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GreaterOrEqual)) {
+ return false;
+ }
+ GreaterOrEqual that = (GreaterOrEqual) o;
+ return index == that.index && literal.equals(that.literal);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index, literal);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
index 81ddc26..535cbc5 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import java.util.Objects;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A {@link Predicate} to eval greater. */
@@ -50,4 +52,21 @@ public class GreaterThan implements Predicate {
}
return literal.compareValueTo(stats.maxValue()) < 0;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GreaterThan)) {
+ return false;
+ }
+ GreaterThan that = (GreaterThan) o;
+ return index == that.index && literal.equals(that.literal);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index, literal);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
index 9de7d5b..3d53724 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import java.util.Objects;
+
/** A {@link Predicate} to eval is not null. */
public class IsNotNull implements Predicate {
@@ -40,4 +42,21 @@ public class IsNotNull implements Predicate {
public boolean test(long rowCount, FieldStats[] fieldStats) {
return fieldStats[index].nullCount() < rowCount;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IsNotNull)) {
+ return false;
+ }
+ IsNotNull isNotNull = (IsNotNull) o;
+ return index == isNotNull.index;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
index 5e9461a..b863682 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import java.util.Objects;
+
/** A {@link Predicate} to eval is null. */
public class IsNull implements Predicate {
@@ -40,4 +42,21 @@ public class IsNull implements Predicate {
public boolean test(long rowCount, FieldStats[] fieldStats) {
return fieldStats[index].nullCount() > 0;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IsNull)) {
+ return false;
+ }
+ IsNull isNull = (IsNull) o;
+ return index == isNull.index;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
index 9a94190..98e24d5 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import java.util.Objects;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A {@link Predicate} to eval less or equal. */
@@ -50,4 +52,21 @@ public class LessOrEqual implements Predicate {
}
return literal.compareValueTo(stats.minValue()) >= 0;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof LessOrEqual)) {
+ return false;
+ }
+ LessOrEqual that = (LessOrEqual) o;
+ return index == that.index && literal.equals(that.literal);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index, literal);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
index f8fda69..b950dd7 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import java.util.Objects;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A {@link Predicate} to eval less. */
@@ -50,4 +52,21 @@ public class LessThan implements Predicate {
}
return literal.compareValueTo(stats.minValue()) > 0;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof LessThan)) {
+ return false;
+ }
+ LessThan lessThan = (LessThan) o;
+ return index == lessThan.index && literal.equals(lessThan.literal);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index, literal);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
index eed1b82..686107e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.Objects;
/** A serializable literal class. */
public class Literal implements Serializable {
@@ -73,4 +74,21 @@ public class Literal implements Serializable {
in.defaultReadObject();
value = InternalSerializers.create(type).deserialize(new
DataInputViewStreamWrapper(in));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Literal)) {
+ return false;
+ }
+ Literal literal = (Literal) o;
+ return type.equals(literal.type) && value.equals(literal.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, value);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
index 2c2136a..4d6a924 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import java.util.Objects;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A {@link Predicate} to eval not equal. */
@@ -48,4 +50,21 @@ public class NotEqual implements Predicate {
return literal.compareValueTo(stats.minValue()) != 0
|| literal.compareValueTo(stats.maxValue()) != 0;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof NotEqual)) {
+ return false;
+ }
+ NotEqual notEqual = (NotEqual) o;
+ return index == notEqual.index && literal.equals(notEqual.literal);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index, literal);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
index 02a79b1..5a5f3d2 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.table.store.file.stats.FieldStats;
+import java.util.Objects;
+
/** A {@link Predicate} to eval or. */
public class Or implements Predicate {
@@ -42,4 +44,21 @@ public class Or implements Predicate {
public boolean test(long rowCount, FieldStats[] fieldStats) {
return predicate1.test(rowCount, fieldStats) ||
predicate2.test(rowCount, fieldStats);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Or)) {
+ return false;
+ }
+ Or or = (Or) o;
+ return predicate1.equals(or.predicate1) &&
predicate2.equals(or.predicate2);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(predicate1, predicate2);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index e625552..f845dec 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -40,6 +40,7 @@ import java.util.Optional;
import java.util.function.BiFunction;
import static
org.apache.flink.table.data.conversion.DataStructureConverters.getConverter;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
/** Convert {@link Expression} to {@link Predicate}. */
public class PredicateConverter implements ExpressionVisitor<Predicate> {
@@ -86,10 +87,12 @@ public class PredicateConverter implements
ExpressionVisitor<Predicate> {
return visitBiFunction(children, LessOrEqual::new,
GreaterOrEqual::new);
} else if (func == BuiltInFunctionDefinitions.IS_NULL) {
return extractFieldReference(children.get(0))
+ .map(FieldReferenceExpression::getFieldIndex)
.map(IsNull::new)
.orElseThrow(UnsupportedExpression::new);
} else if (func == BuiltInFunctionDefinitions.IS_NOT_NULL) {
return extractFieldReference(children.get(0))
+ .map(FieldReferenceExpression::getFieldIndex)
.map(IsNotNull::new)
.orElseThrow(UnsupportedExpression::new);
}
@@ -103,19 +106,19 @@ public class PredicateConverter implements
ExpressionVisitor<Predicate> {
List<Expression> children,
BiFunction<Integer, Literal, Predicate> visit1,
BiFunction<Integer, Literal, Predicate> visit2) {
- Optional<Integer> field = extractFieldReference(children.get(0));
+ Optional<FieldReferenceExpression> fieldRefExpr =
extractFieldReference(children.get(0));
Optional<Literal> literal;
- if (field.isPresent()) {
- literal = extractLiteral(children.get(1));
+ if (fieldRefExpr.isPresent()) {
+ literal = extractLiteral(fieldRefExpr.get().getOutputDataType(),
children.get(1));
if (literal.isPresent()) {
- return visit1.apply(field.get(), literal.get());
+ return visit1.apply(fieldRefExpr.get().getFieldIndex(),
literal.get());
}
} else {
- field = extractFieldReference(children.get(1));
- if (field.isPresent()) {
- literal = extractLiteral(children.get(0));
+ fieldRefExpr = extractFieldReference(children.get(1));
+ if (fieldRefExpr.isPresent()) {
+ literal =
extractLiteral(fieldRefExpr.get().getOutputDataType(), children.get(0));
if (literal.isPresent()) {
- return visit2.apply(field.get(), literal.get());
+ return visit2.apply(fieldRefExpr.get().getFieldIndex(),
literal.get());
}
}
}
@@ -123,30 +126,43 @@ public class PredicateConverter implements
ExpressionVisitor<Predicate> {
throw new UnsupportedExpression();
}
- private Optional<Integer> extractFieldReference(Expression expression) {
+ private Optional<FieldReferenceExpression>
extractFieldReference(Expression expression) {
if (expression instanceof FieldReferenceExpression) {
- int reference = ((FieldReferenceExpression)
expression).getFieldIndex();
- return Optional.of(reference);
+ return Optional.of((FieldReferenceExpression) expression);
}
return Optional.empty();
}
- private Optional<Literal> extractLiteral(Expression expression) {
+ private Optional<Literal> extractLiteral(DataType expectedType, Expression
expression) {
+ LogicalType expectedLogicalType = expectedType.getLogicalType();
+ if (!supportsPredicate(expectedLogicalType)) {
+ return Optional.empty();
+ }
+ Literal literal = null;
if (expression instanceof ValueLiteralExpression) {
ValueLiteralExpression valueExpression = (ValueLiteralExpression)
expression;
- DataType type = valueExpression.getOutputDataType();
- return supportsPredicate(type.getLogicalType())
- ? Optional.of(
- new Literal(
- type.getLogicalType(),
- getConverter(type)
- .toInternalOrNull(
- valueExpression
-
.getValueAs(type.getConversionClass())
- .get())))
- : Optional.empty();
+ DataType actualType = valueExpression.getOutputDataType();
+ LogicalType actualLogicalType = actualType.getLogicalType();
+ Optional<?> valueOpt =
valueExpression.getValueAs(actualType.getConversionClass());
+ if (!valueOpt.isPresent()) {
+ return Optional.empty();
+ }
+ Object value = valueOpt.get();
+ if
(actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())) {
+ literal =
+ new Literal(
+ expectedLogicalType,
+
getConverter(expectedType).toInternalOrNull(value));
+ } else if (supportsImplicitCast(actualLogicalType,
expectedLogicalType)) {
+ try {
+ value = TypeUtils.castFromString(value.toString(),
expectedLogicalType);
+ literal = new Literal(expectedLogicalType, value);
+ } catch (Exception ignored) {
+ // ignore here, let #visit throw UnsupportedExpression
+ }
+ }
}
- return Optional.empty();
+ return literal == null ? Optional.empty() : Optional.of(literal);
}
private boolean supportsPredicate(LogicalType type) {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
new file mode 100644
index 0000000..8cd5717
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link PredicateConverter}. */
+public class PredicateConverterTest {
+
+ private static final PredicateConverter CONVERTER = new
PredicateConverter();
+
+ @MethodSource("provideResolvedExpression")
+ @ParameterizedTest
+ public void testVisitAndAutoTypeInference(ResolvedExpression expression,
Predicate expected) {
+ if (expression instanceof CallExpression) {
+ assertThat(CONVERTER.visit((CallExpression)
expression)).isEqualTo(expected);
+ } else {
+ assertThatThrownBy(() -> CONVERTER.visit(expression))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Unsupported expression");
+ }
+ }
+
+ public static Stream<Arguments> provideResolvedExpression() {
+ FieldReferenceExpression longRefExpr =
+ new FieldReferenceExpression("long1", DataTypes.BIGINT(), 0,
0);
+ ValueLiteralExpression intLitExpr = new ValueLiteralExpression(10);
+ Literal longLit = new Literal(DataTypes.BIGINT().getLogicalType(),
10L);
+
+ FieldReferenceExpression doubleRefExpr =
+ new FieldReferenceExpression("double1", DataTypes.DOUBLE(), 0,
1);
+ ValueLiteralExpression floatLitExpr = new
ValueLiteralExpression(3.14f);
+ Literal doubleLit = new Literal(DataTypes.DOUBLE().getLogicalType(),
3.14d);
+
+ return Stream.of(
+ Arguments.of(longRefExpr, null),
+ Arguments.of(intLitExpr, null),
+ Arguments.of(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.IS_NULL,
+ Collections.singletonList(longRefExpr),
+ DataTypes.BOOLEAN()),
+ new IsNull(0)),
+ Arguments.of(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.IS_NOT_NULL,
+ Collections.singletonList(doubleRefExpr),
+ DataTypes.BOOLEAN()),
+ new IsNotNull(1)),
+ Arguments.of(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ // test literal on left
+ Arrays.asList(intLitExpr, longRefExpr),
+ DataTypes.BOOLEAN()),
+ new Equal(0, longLit)),
+ Arguments.of(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.NOT_EQUALS,
+ Arrays.asList(longRefExpr, intLitExpr),
+ DataTypes.BOOLEAN()),
+ new NotEqual(0, longLit)),
+ Arguments.of(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.GREATER_THAN,
+ Arrays.asList(longRefExpr, intLitExpr),
+ DataTypes.BOOLEAN()),
+ new GreaterThan(0, longLit)),
+ Arguments.of(
+ CallExpression.permanent(
+
BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+ Arrays.asList(longRefExpr, intLitExpr),
+ DataTypes.BOOLEAN()),
+ new GreaterOrEqual(0, longLit)),
+ Arguments.of(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.LESS_THAN,
+ Arrays.asList(longRefExpr, intLitExpr),
+ DataTypes.BOOLEAN()),
+ new LessThan(0, longLit)),
+ Arguments.of(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+ Arrays.asList(longRefExpr, intLitExpr),
+ DataTypes.BOOLEAN()),
+ new LessOrEqual(0, longLit)),
+ Arguments.of(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.AND,
+ Arrays.asList(
+ CallExpression.permanent(
+
BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+ Arrays.asList(longRefExpr,
intLitExpr),
+ DataTypes.BOOLEAN()),
+ CallExpression.permanent(
+
BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(doubleRefExpr,
floatLitExpr),
+ DataTypes.BOOLEAN())),
+ DataTypes.BOOLEAN()),
+ new And(new LessOrEqual(0, longLit), new Equal(1,
doubleLit))),
+ Arguments.of(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.OR,
+ Arrays.asList(
+ CallExpression.permanent(
+
BuiltInFunctionDefinitions.NOT_EQUALS,
+ Arrays.asList(longRefExpr,
intLitExpr),
+ DataTypes.BOOLEAN()),
+ CallExpression.permanent(
+
BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(doubleRefExpr,
floatLitExpr),
+ DataTypes.BOOLEAN())),
+ DataTypes.BOOLEAN()),
+ new Or(new NotEqual(0, longLit), new Equal(1,
doubleLit))));
+ }
+}