This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 82af5e6 [FLINK-25643] Introduce Predicate to file store
82af5e6 is described below
commit 82af5e63f0a6aded5bdeb7e147cc58721f7154aa
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 14 16:30:55 2022 +0800
[FLINK-25643] Introduce Predicate to file store
This closes #7
---
.../flink/table/store/file/predicate/And.java | 43 +++
.../flink/table/store/file/predicate/Equal.java | 52 ++++
.../table/store/file/predicate/GreaterOrEqual.java | 51 ++++
.../table/store/file/predicate/GreaterThan.java | 51 ++++
.../table/store/file/predicate/IsNotNull.java | 41 +++
.../flink/table/store/file/predicate/IsNull.java | 41 +++
.../table/store/file/predicate/LessOrEqual.java | 51 ++++
.../flink/table/store/file/predicate/LessThan.java | 51 ++++
.../flink/table/store/file/predicate/Literal.java | 76 +++++
.../flink/table/store/file/predicate/NotEqual.java | 49 +++
.../flink/table/store/file/predicate/Or.java | 43 +++
.../table/store/file/predicate/Predicate.java | 40 +++
.../store/file/predicate/PredicateConverter.java | 179 +++++++++++
.../table/store/file/predicate/PredicateTest.java | 334 +++++++++++++++++++++
14 files changed, 1102 insertions(+)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
new file mode 100644
index 0000000..b926902
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+/** A {@link Predicate} to eval and. */
+public class And implements Predicate {
+
+ private final Predicate predicate1;
+ private final Predicate predicate2;
+
+ public And(Predicate predicate1, Predicate predicate2) {
+ this.predicate1 = predicate1;
+ this.predicate2 = predicate2;
+ }
+
+ @Override
+ public boolean test(Object[] values) {
+ return predicate1.test(values) && predicate2.test(values);
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ return predicate1.test(rowCount, fieldStats) &&
predicate2.test(rowCount, fieldStats);
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
new file mode 100644
index 0000000..cfec2e1
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval equal. */
+public class Equal implements Predicate {
+
+ private final int index;
+
+ private final Literal literal;
+
+ public Equal(int index, Literal literal) {
+ this.index = index;
+ this.literal = checkNotNull(literal);
+ }
+
+ @Override
+ public boolean test(Object[] values) {
+ Object field = values[index];
+ return field != null && literal.compareValueTo(field) == 0;
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ FieldStats stats = fieldStats[index];
+ if (rowCount == stats.nullCount()) {
+ return false;
+ }
+ return literal.compareValueTo(stats.minValue()) >= 0
+ && literal.compareValueTo(stats.maxValue()) <= 0;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
new file mode 100644
index 0000000..9188b5d
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval greater or equal. */
+public class GreaterOrEqual implements Predicate {
+
+ private final int index;
+
+ private final Literal literal;
+
+ public GreaterOrEqual(int index, Literal literal) {
+ this.index = index;
+ this.literal = checkNotNull(literal);
+ }
+
+ @Override
+ public boolean test(Object[] values) {
+ Object field = values[index];
+ return field != null && literal.compareValueTo(field) <= 0;
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ FieldStats stats = fieldStats[index];
+ if (rowCount == stats.nullCount()) {
+ return false;
+ }
+ return literal.compareValueTo(stats.maxValue()) <= 0;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
new file mode 100644
index 0000000..25d24ac
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval greater. */
+public class GreaterThan implements Predicate {
+
+ private final int index;
+
+ private final Literal literal;
+
+ public GreaterThan(int index, Literal literal) {
+ this.index = index;
+ this.literal = checkNotNull(literal);
+ }
+
+ @Override
+ public boolean test(Object[] values) {
+ Object field = values[index];
+ return field != null && literal.compareValueTo(field) < 0;
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ FieldStats stats = fieldStats[index];
+ if (rowCount == stats.nullCount()) {
+ return false;
+ }
+ return literal.compareValueTo(stats.maxValue()) < 0;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
new file mode 100644
index 0000000..7cb9a3e
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+/** A {@link Predicate} to eval is not null. */
+public class IsNotNull implements Predicate {
+
+ private final int index;
+
+ public IsNotNull(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public boolean test(Object[] values) {
+ return values[index] != null;
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ return fieldStats[index].nullCount() < rowCount;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
new file mode 100644
index 0000000..e3df845
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+/** A {@link Predicate} to eval is null. */
+public class IsNull implements Predicate {
+
+ private final int index;
+
+ public IsNull(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public boolean test(Object[] values) {
+ return values[index] == null;
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ return fieldStats[index].nullCount() > 0;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
new file mode 100644
index 0000000..ed7695c
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval less or equal. */
+public class LessOrEqual implements Predicate {
+
+ private final int index;
+
+ private final Literal literal;
+
+ public LessOrEqual(int index, Literal literal) {
+ this.index = index;
+ this.literal = checkNotNull(literal);
+ }
+
+ @Override
+ public boolean test(Object[] values) {
+ Object field = values[index];
+ return field != null && literal.compareValueTo(field) >= 0;
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ FieldStats stats = fieldStats[index];
+ if (rowCount == stats.nullCount()) {
+ return false;
+ }
+ return literal.compareValueTo(stats.minValue()) >= 0;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
new file mode 100644
index 0000000..2e644c0
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval less. */
+public class LessThan implements Predicate {
+
+ private final int index;
+
+ private final Literal literal;
+
+ public LessThan(int index, Literal literal) {
+ this.index = index;
+ this.literal = checkNotNull(literal);
+ }
+
+ @Override
+ public boolean test(Object[] values) {
+ Object field = values[index];
+ return field != null && literal.compareValueTo(field) > 0;
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ FieldStats stats = fieldStats[index];
+ if (rowCount == stats.nullCount()) {
+ return false;
+ }
+ return literal.compareValueTo(stats.minValue()) > 0;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
new file mode 100644
index 0000000..b83e90d
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/** A serializable literal class. */
+public class Literal implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final BytePrimitiveArrayComparator BINARY_COMPARATOR =
+ new BytePrimitiveArrayComparator(true);
+
+ private final LogicalType type;
+
+ private transient Object value;
+
+ public Literal(LogicalType type, Object value) {
+ this.type = type;
+ this.value = value;
+ }
+
+ public LogicalType type() {
+ return type;
+ }
+
+ public Object value() {
+ return value;
+ }
+
+ public int compareValueTo(Object o) {
+ if (value instanceof Comparable) {
+ return ((Comparable<Object>) value).compareTo(o);
+ } else if (value instanceof byte[]) {
+ return BINARY_COMPARATOR.compare((byte[]) value, (byte[]) o);
+ } else {
+ throw new RuntimeException("Unsupported type: " + type);
+ }
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ InternalSerializers.create(type).serialize(value, new
DataOutputViewStreamWrapper(out));
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.readObject();
+ value = InternalSerializers.create(type).deserialize(new
DataInputViewStreamWrapper(in));
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
new file mode 100644
index 0000000..94232a9
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval not equal. */
+public class NotEqual implements Predicate {
+
+ private final int index;
+
+ private final Literal literal;
+
+ public NotEqual(int index, Literal literal) {
+ this.index = index;
+ this.literal = checkNotNull(literal);
+ }
+
+ @Override
+ public boolean test(Object[] values) {
+ Object field = values[index];
+ return field != null && literal.compareValueTo(field) != 0;
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ FieldStats stats = fieldStats[index];
+ return literal.compareValueTo(stats.minValue()) != 0
+ || literal.compareValueTo(stats.maxValue()) != 0;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
new file mode 100644
index 0000000..6fd264f
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+/** A {@link Predicate} to eval or. */
+public class Or implements Predicate {
+
+ private final Predicate predicate1;
+ private final Predicate predicate2;
+
+ public Or(Predicate predicate1, Predicate predicate2) {
+ this.predicate1 = predicate1;
+ this.predicate2 = predicate2;
+ }
+
+ @Override
+ public boolean test(Object[] values) {
+ return predicate1.test(values) || predicate2.test(values);
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ return predicate1.test(rowCount, fieldStats) ||
predicate2.test(rowCount, fieldStats);
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
new file mode 100644
index 0000000..6edb261
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+/** Predicate which returns Boolean and provides testing by stats. */
+public interface Predicate {
+
+ /**
+ * Test based on the specific input column values.
+ *
+ * @return return true when hit, false when not hit.
+ */
+ boolean test(Object[] values);
+
+ /**
+ * Test based on the statistical information to determine whether a hit is
possible.
+ *
+ * @return return true is likely to hit (there may also be false
positives), return false is
+ * absolutely not possible to hit.
+ */
+ boolean test(long rowCount, FieldStats[] fieldStats);
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
new file mode 100644
index 0000000..f501f59
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+import static
org.apache.flink.table.data.conversion.DataStructureConverters.getConverter;
+
+/** Convert {@link Expression} to {@link Predicate}. */
+public class PredicateConverter implements ExpressionVisitor<Predicate> {
+
+ public static final PredicateConverter CONVERTER = new
PredicateConverter();
+
+ @Override
+ public Predicate visit(CallExpression call) {
+ FunctionDefinition func = call.getFunctionDefinition();
+ List<Expression> children = call.getChildren();
+
+ if (func == BuiltInFunctionDefinitions.AND) {
+ return new And(children.get(0).accept(this),
children.get(1).accept(this));
+ } else if (func == BuiltInFunctionDefinitions.OR) {
+ return new Or(children.get(0).accept(this),
children.get(1).accept(this));
+ } else if (func == BuiltInFunctionDefinitions.EQUALS) {
+ return visitBiFunction(children, Equal::new, Equal::new);
+ } else if (func == BuiltInFunctionDefinitions.NOT_EQUALS) {
+ return visitBiFunction(children, NotEqual::new, NotEqual::new);
+ } else if (func == BuiltInFunctionDefinitions.GREATER_THAN) {
+ return visitBiFunction(children, GreaterThan::new, LessThan::new);
+ } else if (func == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL) {
+ return visitBiFunction(children, GreaterOrEqual::new,
LessOrEqual::new);
+ } else if (func == BuiltInFunctionDefinitions.LESS_THAN) {
+ return visitBiFunction(children, LessThan::new, GreaterThan::new);
+ } else if (func == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL) {
+ return visitBiFunction(children, LessOrEqual::new,
GreaterOrEqual::new);
+ } else if (func == BuiltInFunctionDefinitions.IS_NULL) {
+ return extractFieldReference(children.get(0))
+ .map(IsNull::new)
+ .orElseThrow(UnsupportedExpression::new);
+ } else if (func == BuiltInFunctionDefinitions.IS_NOT_NULL) {
+ return extractFieldReference(children.get(0))
+ .map(IsNotNull::new)
+ .orElseThrow(UnsupportedExpression::new);
+ }
+
+ // TODO is_xxx, between_xxx, like, similar, in, not_in, not?
+
+ throw new UnsupportedExpression();
+ }
+
+ private Predicate visitBiFunction(
+ List<Expression> children,
+ BiFunction<Integer, Literal, Predicate> visit1,
+ BiFunction<Integer, Literal, Predicate> visit2) {
+ Optional<Integer> field = extractFieldReference(children.get(0));
+ Optional<Literal> literal;
+ if (field.isPresent()) {
+ literal = extractLiteral(children.get(1));
+ if (literal.isPresent()) {
+ return visit1.apply(field.get(), literal.get());
+ }
+ } else {
+ field = extractFieldReference(children.get(1));
+ if (field.isPresent()) {
+ literal = extractLiteral(children.get(0));
+ if (literal.isPresent()) {
+ return visit2.apply(field.get(), literal.get());
+ }
+ }
+ }
+
+ throw new UnsupportedExpression();
+ }
+
+ private Optional<Integer> extractFieldReference(Expression expression) {
+ if (expression instanceof FieldReferenceExpression) {
+ int reference = ((FieldReferenceExpression)
expression).getFieldIndex();
+ return Optional.of(reference);
+ }
+ return Optional.empty();
+ }
+
+ private Optional<Literal> extractLiteral(Expression expression) {
+ if (expression instanceof ValueLiteralExpression) {
+ ValueLiteralExpression valueExpression = (ValueLiteralExpression)
expression;
+ DataType type = valueExpression.getOutputDataType();
+ return supportsPredicate(type.getLogicalType())
+ ? Optional.of(
+ new Literal(
+ type.getLogicalType(),
+ getConverter(type)
+ .toInternalOrNull(
+ valueExpression
+
.getValueAs(type.getConversionClass())
+ .get())))
+ : Optional.empty();
+ }
+ return Optional.empty();
+ }
+
+ private boolean supportsPredicate(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BOOLEAN:
+ case BINARY:
+ case VARBINARY:
+ case DECIMAL:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_DAY_TIME:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public Predicate visit(ValueLiteralExpression valueLiteralExpression) {
+ throw new RuntimeException("Literal should be resolved in call
expression.");
+ }
+
+ @Override
+ public Predicate visit(FieldReferenceExpression fieldReferenceExpression) {
+ throw new RuntimeException("Field reference should be resolved in call
expression.");
+ }
+
+ @Override
+ public Predicate visit(TypeLiteralExpression typeLiteralExpression) {
+ throw new RuntimeException(
+ "Type literal is unsupported: " +
typeLiteralExpression.asSummaryString());
+ }
+
+ @Override
+ public Predicate visit(Expression expression) {
+ throw new RuntimeException("Unsupported expression: " +
expression.asSummaryString());
+ }
+
+ /** Encounter an unsupported expression, the caller can choose to ignore
this filter branch. */
+ public static class UnsupportedExpression extends RuntimeException {}
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
new file mode 100644
index 0000000..81065c2
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static
org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
+import static
org.apache.flink.table.store.file.predicate.PredicateConverter.CONVERTER;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link Predicate}s. */
+public class PredicateTest {
+
+ @Test
+ public void testEqual() {
+ CallExpression expression =
+ call(BuiltInFunctionDefinitions.EQUALS, field(0,
DataTypes.INT()), literal(5));
+ Predicate predicate = expression.accept(CONVERTER);
+
+ assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testNotEqual() {
+ CallExpression expression =
+ call(BuiltInFunctionDefinitions.NOT_EQUALS, field(0,
DataTypes.INT()), literal(5));
+ Predicate predicate = expression.accept(CONVERTER);
+
+ assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(5, 5,
0)})).isEqualTo(false);
+ }
+
+ @Test
+ public void testGreater() {
+ CallExpression expression =
+ call(
+ BuiltInFunctionDefinitions.GREATER_THAN,
+ field(0, DataTypes.INT()),
+ literal(5));
+ Predicate predicate = expression.accept(CONVERTER);
+
+ assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {6})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4,
0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testGreaterReverse() {
+ CallExpression expression =
+ call(BuiltInFunctionDefinitions.LESS_THAN, literal(5),
field(0, DataTypes.INT()));
+ Predicate predicate = expression.accept(CONVERTER);
+
+ assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {6})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4,
0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testGreaterOrEqual() {
+ CallExpression expression =
+ call(
+ BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+ field(0, DataTypes.INT()),
+ literal(5));
+ Predicate predicate = expression.accept(CONVERTER);
+
+ assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {6})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4,
0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testLess() {
+ CallExpression expression =
+ call(BuiltInFunctionDefinitions.LESS_THAN, field(0,
DataTypes.INT()), literal(5));
+ Predicate predicate = expression.accept(CONVERTER);
+
+ assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {6})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7,
0)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testLessOrEqual() {
+ CallExpression expression =
+ call(
+ BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+ field(0, DataTypes.INT()),
+ literal(5));
+ Predicate predicate = expression.accept(CONVERTER);
+
+ assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {6})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7,
0)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testIsNull() {
+ CallExpression expression =
+ call(
+ BuiltInFunctionDefinitions.IS_NULL,
+ field(0, DataTypes.INT()),
+ literal(null, DataTypes.INT()));
+ Predicate predicate = expression.accept(CONVERTER);
+
+ assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(true);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
1)})).isEqualTo(true);
+ }
+
+ @Test
+ public void testIsNotNull() {
+ CallExpression expression =
+ call(
+ BuiltInFunctionDefinitions.IS_NOT_NULL,
+ field(0, DataTypes.INT()),
+ literal(null, DataTypes.INT()));
+ Predicate predicate = expression.accept(CONVERTER);
+
+ assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
1)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null,
null, 3)}))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testAnd() {
+ CallExpression expression =
+ call(
+ BuiltInFunctionDefinitions.AND,
+ call(
+ BuiltInFunctionDefinitions.EQUALS,
+ field(0, DataTypes.INT()),
+ literal(3)),
+ call(
+ BuiltInFunctionDefinitions.EQUALS,
+ field(1, DataTypes.INT()),
+ literal(5)));
+ Predicate predicate = expression.accept(CONVERTER);
+
+ assertThat(predicate.test(new Object[] {4, 5})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {3, 5})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {null, 5})).isEqualTo(false);
+
+ assertThat(
+ predicate.test(
+ 3,
+ new FieldStats[] {
+ new FieldStats(3, 6, 0), new FieldStats(4,
6, 0)
+ }))
+ .isEqualTo(true);
+ assertThat(
+ predicate.test(
+ 3,
+ new FieldStats[] {
+ new FieldStats(3, 6, 0), new FieldStats(6,
8, 0)
+ }))
+ .isEqualTo(false);
+ assertThat(
+ predicate.test(
+ 3,
+ new FieldStats[] {
+ new FieldStats(6, 7, 0), new FieldStats(4,
6, 0)
+ }))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testOr() {
+ CallExpression expression =
+ call(
+ BuiltInFunctionDefinitions.OR,
+ call(
+ BuiltInFunctionDefinitions.EQUALS,
+ field(0, DataTypes.INT()),
+ literal(3)),
+ call(
+ BuiltInFunctionDefinitions.EQUALS,
+ field(1, DataTypes.INT()),
+ literal(5)));
+ Predicate predicate = expression.accept(CONVERTER);
+
+ assertThat(predicate.test(new Object[] {4, 6})).isEqualTo(false);
+ assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {3, 5})).isEqualTo(true);
+ assertThat(predicate.test(new Object[] {null, 5})).isEqualTo(true);
+
+ assertThat(
+ predicate.test(
+ 3,
+ new FieldStats[] {
+ new FieldStats(3, 6, 0), new FieldStats(4,
6, 0)
+ }))
+ .isEqualTo(true);
+ assertThat(
+ predicate.test(
+ 3,
+ new FieldStats[] {
+ new FieldStats(3, 6, 0), new FieldStats(6,
8, 0)
+ }))
+ .isEqualTo(true);
+ assertThat(
+ predicate.test(
+ 3,
+ new FieldStats[] {
+ new FieldStats(6, 7, 0), new FieldStats(8,
10, 0)
+ }))
+ .isEqualTo(false);
+ }
+
+ @Test
+ public void testUnsupportedExpression() {
+ CallExpression expression =
+ call(
+ BuiltInFunctionDefinitions.AND,
+ call(
+ BuiltInFunctionDefinitions.EQUALS,
+ field(0, DataTypes.INT()),
+ literal(3)),
+ call(
+ BuiltInFunctionDefinitions.LIKE,
+ field(1, DataTypes.INT()),
+ literal(5)));
+ assertThatThrownBy(() -> expression.accept(CONVERTER))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ }
+
+ @Test
+ public void testUnsupportedType() {
+ DataType structType =
DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class);
+ CallExpression expression =
+ call(
+ BuiltInFunctionDefinitions.EQUALS,
+ field(0, structType),
+ literal(Row.of(1), structType));
+ assertThatThrownBy(() -> expression.accept(CONVERTER))
+ .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+ }
+
+ private FieldReferenceExpression field(int i, DataType type) {
+ return new FieldReferenceExpression("name", type, 0, i);
+ }
+
+ private CallExpression call(FunctionDefinition function,
ResolvedExpression... args) {
+ return new CallExpression(function, Arrays.asList(args),
DataTypes.BOOLEAN());
+ }
+}