This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 82af5e6  [FLINK-25643] Introduce Predicate to file store
82af5e6 is described below

commit 82af5e63f0a6aded5bdeb7e147cc58721f7154aa
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 14 16:30:55 2022 +0800

    [FLINK-25643] Introduce Predicate to file store
    
    This closes #7
---
 .../flink/table/store/file/predicate/And.java      |  43 +++
 .../flink/table/store/file/predicate/Equal.java    |  52 ++++
 .../table/store/file/predicate/GreaterOrEqual.java |  51 ++++
 .../table/store/file/predicate/GreaterThan.java    |  51 ++++
 .../table/store/file/predicate/IsNotNull.java      |  41 +++
 .../flink/table/store/file/predicate/IsNull.java   |  41 +++
 .../table/store/file/predicate/LessOrEqual.java    |  51 ++++
 .../flink/table/store/file/predicate/LessThan.java |  51 ++++
 .../flink/table/store/file/predicate/Literal.java  |  76 +++++
 .../flink/table/store/file/predicate/NotEqual.java |  49 +++
 .../flink/table/store/file/predicate/Or.java       |  43 +++
 .../table/store/file/predicate/Predicate.java      |  40 +++
 .../store/file/predicate/PredicateConverter.java   | 179 +++++++++++
 .../table/store/file/predicate/PredicateTest.java  | 334 +++++++++++++++++++++
 14 files changed, 1102 insertions(+)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
new file mode 100644
index 0000000..b926902
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+/** A {@link Predicate} to eval and. */
+public class And implements Predicate {
+
+    private final Predicate predicate1;
+    private final Predicate predicate2;
+
+    public And(Predicate predicate1, Predicate predicate2) {
+        this.predicate1 = predicate1;
+        this.predicate2 = predicate2;
+    }
+
+    @Override
+    public boolean test(Object[] values) {
+        return predicate1.test(values) && predicate2.test(values);
+    }
+
+    @Override
+    public boolean test(long rowCount, FieldStats[] fieldStats) {
+        return predicate1.test(rowCount, fieldStats) && 
predicate2.test(rowCount, fieldStats);
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
new file mode 100644
index 0000000..cfec2e1
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Equal.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval equal. */
+public class Equal implements Predicate {
+
+    private final int index;
+
+    private final Literal literal;
+
+    public Equal(int index, Literal literal) {
+        this.index = index;
+        this.literal = checkNotNull(literal);
+    }
+
+    @Override
+    public boolean test(Object[] values) {
+        Object field = values[index];
+        return field != null && literal.compareValueTo(field) == 0;
+    }
+
+    @Override
+    public boolean test(long rowCount, FieldStats[] fieldStats) {
+        FieldStats stats = fieldStats[index];
+        if (rowCount == stats.nullCount()) {
+            return false;
+        }
+        return literal.compareValueTo(stats.minValue()) >= 0
+                && literal.compareValueTo(stats.maxValue()) <= 0;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
new file mode 100644
index 0000000..9188b5d
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterOrEqual.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval greater or equal. */
+public class GreaterOrEqual implements Predicate {
+
+    private final int index;
+
+    private final Literal literal;
+
+    public GreaterOrEqual(int index, Literal literal) {
+        this.index = index;
+        this.literal = checkNotNull(literal);
+    }
+
+    @Override
+    public boolean test(Object[] values) {
+        Object field = values[index];
+        return field != null && literal.compareValueTo(field) <= 0;
+    }
+
+    @Override
+    public boolean test(long rowCount, FieldStats[] fieldStats) {
+        FieldStats stats = fieldStats[index];
+        if (rowCount == stats.nullCount()) {
+            return false;
+        }
+        return literal.compareValueTo(stats.maxValue()) <= 0;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
new file mode 100644
index 0000000..25d24ac
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/GreaterThan.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval greater. */
+public class GreaterThan implements Predicate {
+
+    private final int index;
+
+    private final Literal literal;
+
+    public GreaterThan(int index, Literal literal) {
+        this.index = index;
+        this.literal = checkNotNull(literal);
+    }
+
+    @Override
+    public boolean test(Object[] values) {
+        Object field = values[index];
+        return field != null && literal.compareValueTo(field) < 0;
+    }
+
+    @Override
+    public boolean test(long rowCount, FieldStats[] fieldStats) {
+        FieldStats stats = fieldStats[index];
+        if (rowCount == stats.nullCount()) {
+            return false;
+        }
+        return literal.compareValueTo(stats.maxValue()) < 0;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
new file mode 100644
index 0000000..7cb9a3e
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+/** A {@link Predicate} to eval is not null. */
+public class IsNotNull implements Predicate {
+
+    private final int index;
+
+    public IsNotNull(int index) {
+        this.index = index;
+    }
+
+    @Override
+    public boolean test(Object[] values) {
+        return values[index] != null;
+    }
+
+    @Override
+    public boolean test(long rowCount, FieldStats[] fieldStats) {
+        return fieldStats[index].nullCount() < rowCount;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
new file mode 100644
index 0000000..e3df845
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+/** A {@link Predicate} to eval is null. */
+public class IsNull implements Predicate {
+
+    private final int index;
+
+    public IsNull(int index) {
+        this.index = index;
+    }
+
+    @Override
+    public boolean test(Object[] values) {
+        return values[index] == null;
+    }
+
+    @Override
+    public boolean test(long rowCount, FieldStats[] fieldStats) {
+        return fieldStats[index].nullCount() > 0;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
new file mode 100644
index 0000000..ed7695c
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessOrEqual.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval less or equal. */
+public class LessOrEqual implements Predicate {
+
+    private final int index;
+
+    private final Literal literal;
+
+    public LessOrEqual(int index, Literal literal) {
+        this.index = index;
+        this.literal = checkNotNull(literal);
+    }
+
+    @Override
+    public boolean test(Object[] values) {
+        Object field = values[index];
+        return field != null && literal.compareValueTo(field) >= 0;
+    }
+
+    @Override
+    public boolean test(long rowCount, FieldStats[] fieldStats) {
+        FieldStats stats = fieldStats[index];
+        if (rowCount == stats.nullCount()) {
+            return false;
+        }
+        return literal.compareValueTo(stats.minValue()) >= 0;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
new file mode 100644
index 0000000..2e644c0
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LessThan.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval less. */
+public class LessThan implements Predicate {
+
+    private final int index;
+
+    private final Literal literal;
+
+    public LessThan(int index, Literal literal) {
+        this.index = index;
+        this.literal = checkNotNull(literal);
+    }
+
+    @Override
+    public boolean test(Object[] values) {
+        Object field = values[index];
+        return field != null && literal.compareValueTo(field) > 0;
+    }
+
+    @Override
+    public boolean test(long rowCount, FieldStats[] fieldStats) {
+        FieldStats stats = fieldStats[index];
+        if (rowCount == stats.nullCount()) {
+            return false;
+        }
+        return literal.compareValueTo(stats.minValue()) > 0;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
new file mode 100644
index 0000000..b83e90d
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Literal.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/** A serializable literal class. */
+public class Literal implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final BytePrimitiveArrayComparator BINARY_COMPARATOR =
+            new BytePrimitiveArrayComparator(true);
+
+    private final LogicalType type;
+
+    private transient Object value;
+
+    public Literal(LogicalType type, Object value) {
+        this.type = type;
+        this.value = value;
+    }
+
+    public LogicalType type() {
+        return type;
+    }
+
+    public Object value() {
+        return value;
+    }
+
+    public int compareValueTo(Object o) {
+        if (value instanceof Comparable) {
+            return ((Comparable<Object>) value).compareTo(o);
+        } else if (value instanceof byte[]) {
+            return BINARY_COMPARATOR.compare((byte[]) value, (byte[]) o);
+        } else {
+            throw new RuntimeException("Unsupported type: " + type);
+        }
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        InternalSerializers.create(type).serialize(value, new 
DataOutputViewStreamWrapper(out));
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        in.readObject();
+        value = InternalSerializers.create(type).deserialize(new 
DataInputViewStreamWrapper(in));
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
new file mode 100644
index 0000000..94232a9
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotEqual.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link Predicate} to eval not equal. */
+public class NotEqual implements Predicate {
+
+    private final int index;
+
+    private final Literal literal;
+
+    public NotEqual(int index, Literal literal) {
+        this.index = index;
+        this.literal = checkNotNull(literal);
+    }
+
+    @Override
+    public boolean test(Object[] values) {
+        Object field = values[index];
+        return field != null && literal.compareValueTo(field) != 0;
+    }
+
+    @Override
+    public boolean test(long rowCount, FieldStats[] fieldStats) {
+        FieldStats stats = fieldStats[index];
+        return literal.compareValueTo(stats.minValue()) != 0
+                || literal.compareValueTo(stats.maxValue()) != 0;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
new file mode 100644
index 0000000..6fd264f
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+/** A {@link Predicate} to eval or. */
+public class Or implements Predicate {
+
+    private final Predicate predicate1;
+    private final Predicate predicate2;
+
+    public Or(Predicate predicate1, Predicate predicate2) {
+        this.predicate1 = predicate1;
+        this.predicate2 = predicate2;
+    }
+
+    @Override
+    public boolean test(Object[] values) {
+        return predicate1.test(values) || predicate2.test(values);
+    }
+
+    @Override
+    public boolean test(long rowCount, FieldStats[] fieldStats) {
+        return predicate1.test(rowCount, fieldStats) || 
predicate2.test(rowCount, fieldStats);
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
new file mode 100644
index 0000000..6edb261
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Predicate.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+/** Predicate which returns Boolean and provides testing by stats. */
+public interface Predicate {
+
+    /**
+     * Test based on the specific input column values.
+     *
+     * @return return true when hit, false when not hit.
+     */
+    boolean test(Object[] values);
+
+    /**
+     * Test based on the statistical information to determine whether a hit is 
possible.
+     *
+     * @return return true is likely to hit (there may also be false 
positives), return false is
+     *     absolutely not possible to hit.
+     */
+    boolean test(long rowCount, FieldStats[] fieldStats);
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
new file mode 100644
index 0000000..f501f59
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+import static 
org.apache.flink.table.data.conversion.DataStructureConverters.getConverter;
+
+/** Convert {@link Expression} to {@link Predicate}. */
+public class PredicateConverter implements ExpressionVisitor<Predicate> {
+
+    public static final PredicateConverter CONVERTER = new 
PredicateConverter();
+
+    @Override
+    public Predicate visit(CallExpression call) {
+        FunctionDefinition func = call.getFunctionDefinition();
+        List<Expression> children = call.getChildren();
+
+        if (func == BuiltInFunctionDefinitions.AND) {
+            return new And(children.get(0).accept(this), 
children.get(1).accept(this));
+        } else if (func == BuiltInFunctionDefinitions.OR) {
+            return new Or(children.get(0).accept(this), 
children.get(1).accept(this));
+        } else if (func == BuiltInFunctionDefinitions.EQUALS) {
+            return visitBiFunction(children, Equal::new, Equal::new);
+        } else if (func == BuiltInFunctionDefinitions.NOT_EQUALS) {
+            return visitBiFunction(children, NotEqual::new, NotEqual::new);
+        } else if (func == BuiltInFunctionDefinitions.GREATER_THAN) {
+            return visitBiFunction(children, GreaterThan::new, LessThan::new);
+        } else if (func == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL) {
+            return visitBiFunction(children, GreaterOrEqual::new, 
LessOrEqual::new);
+        } else if (func == BuiltInFunctionDefinitions.LESS_THAN) {
+            return visitBiFunction(children, LessThan::new, GreaterThan::new);
+        } else if (func == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL) {
+            return visitBiFunction(children, LessOrEqual::new, 
GreaterOrEqual::new);
+        } else if (func == BuiltInFunctionDefinitions.IS_NULL) {
+            return extractFieldReference(children.get(0))
+                    .map(IsNull::new)
+                    .orElseThrow(UnsupportedExpression::new);
+        } else if (func == BuiltInFunctionDefinitions.IS_NOT_NULL) {
+            return extractFieldReference(children.get(0))
+                    .map(IsNotNull::new)
+                    .orElseThrow(UnsupportedExpression::new);
+        }
+
+        // TODO is_xxx, between_xxx, like, similar, in, not_in, not?
+
+        throw new UnsupportedExpression();
+    }
+
+    private Predicate visitBiFunction(
+            List<Expression> children,
+            BiFunction<Integer, Literal, Predicate> visit1,
+            BiFunction<Integer, Literal, Predicate> visit2) {
+        Optional<Integer> field = extractFieldReference(children.get(0));
+        Optional<Literal> literal;
+        if (field.isPresent()) {
+            literal = extractLiteral(children.get(1));
+            if (literal.isPresent()) {
+                return visit1.apply(field.get(), literal.get());
+            }
+        } else {
+            field = extractFieldReference(children.get(1));
+            if (field.isPresent()) {
+                literal = extractLiteral(children.get(0));
+                if (literal.isPresent()) {
+                    return visit2.apply(field.get(), literal.get());
+                }
+            }
+        }
+
+        throw new UnsupportedExpression();
+    }
+
+    private Optional<Integer> extractFieldReference(Expression expression) {
+        if (expression instanceof FieldReferenceExpression) {
+            int reference = ((FieldReferenceExpression) 
expression).getFieldIndex();
+            return Optional.of(reference);
+        }
+        return Optional.empty();
+    }
+
+    private Optional<Literal> extractLiteral(Expression expression) {
+        if (expression instanceof ValueLiteralExpression) {
+            ValueLiteralExpression valueExpression = (ValueLiteralExpression) 
expression;
+            DataType type = valueExpression.getOutputDataType();
+            return supportsPredicate(type.getLogicalType())
+                    ? Optional.of(
+                            new Literal(
+                                    type.getLogicalType(),
+                                    getConverter(type)
+                                            .toInternalOrNull(
+                                                    valueExpression
+                                                            
.getValueAs(type.getConversionClass())
+                                                            .get())))
+                    : Optional.empty();
+        }
+        return Optional.empty();
+    }
+
+    private boolean supportsPredicate(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case BOOLEAN:
+            case BINARY:
+            case VARBINARY:
+            case DECIMAL:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_DAY_TIME:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    @Override
+    public Predicate visit(ValueLiteralExpression valueLiteralExpression) {
+        throw new RuntimeException("Literal should be resolved in call 
expression.");
+    }
+
+    @Override
+    public Predicate visit(FieldReferenceExpression fieldReferenceExpression) {
+        throw new RuntimeException("Field reference should be resolved in call 
expression.");
+    }
+
+    @Override
+    public Predicate visit(TypeLiteralExpression typeLiteralExpression) {
+        throw new RuntimeException(
+                "Type literal is unsupported: " + 
typeLiteralExpression.asSummaryString());
+    }
+
+    @Override
+    public Predicate visit(Expression expression) {
+        throw new RuntimeException("Unsupported expression: " + 
expression.asSummaryString());
+    }
+
+    /** Encounter an unsupported expression, the caller can choose to ignore 
this filter branch. */
+    public static class UnsupportedExpression extends RuntimeException {}
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
new file mode 100644
index 0000000..81065c2
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
+import static 
org.apache.flink.table.store.file.predicate.PredicateConverter.CONVERTER;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link Predicate}s. */
+public class PredicateTest {
+
+    @Test
+    public void testEqual() {
+        CallExpression expression =
+                call(BuiltInFunctionDefinitions.EQUALS, field(0, 
DataTypes.INT()), literal(5));
+        Predicate predicate = expression.accept(CONVERTER);
+
+        assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testNotEqual() {
+        CallExpression expression =
+                call(BuiltInFunctionDefinitions.NOT_EQUALS, field(0, 
DataTypes.INT()), literal(5));
+        Predicate predicate = expression.accept(CONVERTER);
+
+        assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(5, 5, 
0)})).isEqualTo(false);
+    }
+
+    @Test
+    public void testGreater() {
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.GREATER_THAN,
+                        field(0, DataTypes.INT()),
+                        literal(5));
+        Predicate predicate = expression.accept(CONVERTER);
+
+        assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {6})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4, 
0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testGreaterReverse() {
+        CallExpression expression =
+                call(BuiltInFunctionDefinitions.LESS_THAN, literal(5), 
field(0, DataTypes.INT()));
+        Predicate predicate = expression.accept(CONVERTER);
+
+        assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {6})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4, 
0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testGreaterOrEqual() {
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+                        field(0, DataTypes.INT()),
+                        literal(5));
+        Predicate predicate = expression.accept(CONVERTER);
+
+        assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {6})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4, 
0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testLess() {
+        CallExpression expression =
+                call(BuiltInFunctionDefinitions.LESS_THAN, field(0, 
DataTypes.INT()), literal(5));
+        Predicate predicate = expression.accept(CONVERTER);
+
+        assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {6})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testLessOrEqual() {
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+                        field(0, DataTypes.INT()),
+                        literal(5));
+        Predicate predicate = expression.accept(CONVERTER);
+
+        assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {6})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testIsNull() {
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.IS_NULL,
+                        field(0, DataTypes.INT()),
+                        literal(null, DataTypes.INT()));
+        Predicate predicate = expression.accept(CONVERTER);
+
+        assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(true);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
1)})).isEqualTo(true);
+    }
+
+    @Test
+    public void testIsNotNull() {
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.IS_NOT_NULL,
+                        field(0, DataTypes.INT()),
+                        literal(null, DataTypes.INT()));
+        Predicate predicate = expression.accept(CONVERTER);
+
+        assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
+
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
1)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null, 
null, 3)}))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testAnd() {
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.AND,
+                        call(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                field(0, DataTypes.INT()),
+                                literal(3)),
+                        call(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                field(1, DataTypes.INT()),
+                                literal(5)));
+        Predicate predicate = expression.accept(CONVERTER);
+
+        assertThat(predicate.test(new Object[] {4, 5})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {3, 5})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {null, 5})).isEqualTo(false);
+
+        assertThat(
+                        predicate.test(
+                                3,
+                                new FieldStats[] {
+                                    new FieldStats(3, 6, 0), new FieldStats(4, 
6, 0)
+                                }))
+                .isEqualTo(true);
+        assertThat(
+                        predicate.test(
+                                3,
+                                new FieldStats[] {
+                                    new FieldStats(3, 6, 0), new FieldStats(6, 
8, 0)
+                                }))
+                .isEqualTo(false);
+        assertThat(
+                        predicate.test(
+                                3,
+                                new FieldStats[] {
+                                    new FieldStats(6, 7, 0), new FieldStats(4, 
6, 0)
+                                }))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testOr() {
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.OR,
+                        call(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                field(0, DataTypes.INT()),
+                                literal(3)),
+                        call(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                field(1, DataTypes.INT()),
+                                literal(5)));
+        Predicate predicate = expression.accept(CONVERTER);
+
+        assertThat(predicate.test(new Object[] {4, 6})).isEqualTo(false);
+        assertThat(predicate.test(new Object[] {3, 6})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {3, 5})).isEqualTo(true);
+        assertThat(predicate.test(new Object[] {null, 5})).isEqualTo(true);
+
+        assertThat(
+                        predicate.test(
+                                3,
+                                new FieldStats[] {
+                                    new FieldStats(3, 6, 0), new FieldStats(4, 
6, 0)
+                                }))
+                .isEqualTo(true);
+        assertThat(
+                        predicate.test(
+                                3,
+                                new FieldStats[] {
+                                    new FieldStats(3, 6, 0), new FieldStats(6, 
8, 0)
+                                }))
+                .isEqualTo(true);
+        assertThat(
+                        predicate.test(
+                                3,
+                                new FieldStats[] {
+                                    new FieldStats(6, 7, 0), new FieldStats(8, 
10, 0)
+                                }))
+                .isEqualTo(false);
+    }
+
+    @Test
+    public void testUnsupportedExpression() {
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.AND,
+                        call(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                field(0, DataTypes.INT()),
+                                literal(3)),
+                        call(
+                                BuiltInFunctionDefinitions.LIKE,
+                                field(1, DataTypes.INT()),
+                                literal(5)));
+        assertThatThrownBy(() -> expression.accept(CONVERTER))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+    }
+
+    @Test
+    public void testUnsupportedType() {
+        DataType structType = 
DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class);
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.EQUALS,
+                        field(0, structType),
+                        literal(Row.of(1), structType));
+        assertThatThrownBy(() -> expression.accept(CONVERTER))
+                .isInstanceOf(PredicateConverter.UnsupportedExpression.class);
+    }
+
+    private FieldReferenceExpression field(int i, DataType type) {
+        return new FieldReferenceExpression("name", type, 0, i);
+    }
+
+    private CallExpression call(FunctionDefinition function, 
ResolvedExpression... args) {
+        return new CallExpression(function, Arrays.asList(args), 
DataTypes.BOOLEAN());
+    }
+}

Reply via email to