This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ae15844ab [flink] Enhance filter push down to consume partition filter 
(#2346)
ae15844ab is described below

commit ae15844ab97c30532a16d7ef962f46afa3293b4c
Author: Aitozi <[email protected]>
AuthorDate: Tue Nov 21 11:40:30 2023 +0800

    [flink] Enhance filter push down to consume partition filter (#2346)
---
 .../paimon/operation/DefaultValueAssigner.java     |   8 +-
 .../paimon/flink/source/DataTableSource.java       |   5 +
 .../paimon/flink/source/FlinkTableSource.java      |  58 +++++-
 .../paimon/flink/source/SystemTableSource.java     |   5 +
 .../flink/source/table/PushedRichTableSource.java  |   3 +-
 .../flink/source/table/PushedTableSource.java      |   3 +-
 .../paimon/flink/source/FilterPushDownITCase.java  | 149 ++++++++++++++
 .../paimon/flink/source/FlinkTableSourceTest.java  | 219 +++++++++++++++++++++
 8 files changed, 439 insertions(+), 11 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
index 730aafe8c..794423648 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
@@ -133,17 +133,17 @@ public class DefaultValueAssigner {
                             return Optional.of(predicate);
                         };
 
-                ArrayList<Predicate> filterWithouDefaultValueField = new 
ArrayList<>();
+                ArrayList<Predicate> filterWithoutDefaultValueField = new 
ArrayList<>();
 
                 List<Predicate> predicates = 
PredicateBuilder.splitAnd(filters);
                 for (Predicate predicate : predicates) {
                     predicate
                             .visit(deletePredicateWithFieldNameVisitor)
-                            .ifPresent(filterWithouDefaultValueField::add);
+                            .ifPresent(filterWithoutDefaultValueField::add);
                 }
 
-                if (!filterWithouDefaultValueField.isEmpty()) {
-                    result = 
PredicateBuilder.and(filterWithouDefaultValueField);
+                if (!filterWithoutDefaultValueField.isEmpty()) {
+                    result = 
PredicateBuilder.and(filterWithoutDefaultValueField);
                 } else {
                     result = null;
                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index cb26a76a8..208691477 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -344,6 +344,11 @@ public class DataTableSource extends FlinkTableSource {
         this.dynamicPartitionFilteringFields = candidateFilterFields;
     }
 
+    @Override
+    public boolean isStreaming() {
+        return streaming;
+    }
+
     /** Split statistics for inferring row count and parallelism size. */
     protected static class SplitStatistics {
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index ac2ef6ce9..c35b46444 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -20,8 +20,11 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.PredicateConverter;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.table.Table;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -35,15 +38,20 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 /** A Flink {@link ScanTableSource} for paimon. */
 public abstract class FlinkTableSource {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkTableSource.class);
+
     protected final Table table;
 
     @Nullable protected Predicate predicate;
@@ -65,13 +73,55 @@ public abstract class FlinkTableSource {
         this.limit = limit;
     }
 
-    public void pushFilters(List<ResolvedExpression> filters) {
-        List<Predicate> converted = new ArrayList<>();
+    /** @return The unconsumed filters. */
+    public List<ResolvedExpression> pushFilters(List<ResolvedExpression> 
filters) {
+        List<String> partitionKeys = table.partitionKeys();
         RowType rowType = LogicalTypeConversion.toLogicalType(table.rowType());
+
+        // The source must ensure the consumed filters are fully evaluated, 
otherwise the result
+        // of query will be wrong.
+        List<ResolvedExpression> unConsumedFilters = new ArrayList<>();
+        List<ResolvedExpression> consumedFilters = new ArrayList<>();
+        List<Predicate> converted = new ArrayList<>();
+        PredicateVisitor<Boolean> visitor =
+                new PredicateVisitor<Boolean>() {
+                    @Override
+                    public Boolean visit(LeafPredicate predicate) {
+                        return partitionKeys.contains(predicate.fieldName());
+                    }
+
+                    @Override
+                    public Boolean visit(CompoundPredicate predicate) {
+                        for (Predicate child : predicate.children()) {
+                            Boolean matched = child.visit(this);
+
+                            if (!matched) {
+                                return false;
+                            }
+                        }
+                        return true;
+                    }
+                };
+
         for (ResolvedExpression filter : filters) {
-            PredicateConverter.convert(rowType, 
filter).ifPresent(converted::add);
+            Optional<Predicate> predicateOptional = 
PredicateConverter.convert(rowType, filter);
+
+            if (!predicateOptional.isPresent()) {
+                unConsumedFilters.add(filter);
+            } else {
+                Predicate p = predicateOptional.get();
+                if (isStreaming() || !p.visit(visitor)) {
+                    unConsumedFilters.add(filter);
+                } else {
+                    consumedFilters.add(filter);
+                }
+                converted.add(p);
+            }
         }
         predicate = converted.isEmpty() ? null : 
PredicateBuilder.and(converted);
+        LOG.info("Consumed filters: {} of {}", consumedFilters, filters);
+
+        return unConsumedFilters;
     }
 
     public void pushProjection(int[][] projectedFields) {
@@ -99,4 +149,6 @@ public abstract class FlinkTableSource {
     public abstract List<String> listAcceptedFilterFields();
 
     public abstract void applyDynamicFiltering(List<String> 
candidateFilterFields);
+
+    public abstract boolean isStreaming();
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index b6cc5d85d..b577a73aa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -134,4 +134,9 @@ public class SystemTableSource extends FlinkTableSource {
                         "Cannot apply dynamic filtering to Paimon system table 
'%s'.",
                         table.name()));
     }
+
+    @Override
+    public boolean isStreaming() {
+        return isStreamingMode;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
index 9d5083dda..7e55b83f6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
@@ -45,8 +45,7 @@ public class PushedRichTableSource extends RichTableSource
 
     @Override
     public Result applyFilters(List<ResolvedExpression> filters) {
-        source.pushFilters(filters);
-        return Result.of(filters, filters);
+        return Result.of(filters, source.pushFilters(filters));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
index c5bb17acf..a1389b5bf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
@@ -45,8 +45,7 @@ public class PushedTableSource extends BaseTableSource
 
     @Override
     public Result applyFilters(List<ResolvedExpression> filters) {
-        source.pushFilters(filters);
-        return Result.of(filters, filters);
+        return Result.of(filters, source.pushFilters(filters));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java
new file mode 100644
index 000000000..03c918ebc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java
@@ -0,0 +1,149 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.ExplainFormat;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for filter push down. */
+public class FilterPushDownITCase extends CatalogITCaseBase {
+
+    @Override
+    public List<String> ddl() {
+        return ImmutableList.of("CREATE TABLE T (" + "a INT, b INT, c STRING) 
PARTITIONED BY (a);");
+    }
+
+    @BeforeEach
+    @Override
+    public void before() throws IOException {
+        super.before();
+        batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2'), (2, 3, '3'), 
(3, 3, '3')");
+    }
+
+    @Test
+    public void testPartitionConditionConsuming_OnePartitionCondition() {
+        String sql = "SELECT * FROM T where a = 1 limit 1";
+        assertPlanAndResult(
+                sql,
+                "+- Limit(offset=[0], fetch=[1], global=[false])\n"
+                        + "+- TableSourceScan(table=[[PAIMON, default, T, 
filter=[=(a, 1)], project=[b, c], limit=[1]]], fields=[b, c])",
+                Row.ofKind(RowKind.INSERT, 1, 1, "1"));
+    }
+
+    @Test
+    public void testPartitionConditionConsuming_PartitionConditionAndOther() {
+        String sql = "SELECT * FROM T where (a = 1 or a = 2) and c = '1' limit 
1";
+        // c = '1' is not consumed and limit 1 not push to source
+        assertPlanAndResult(
+                sql,
+                "+- Calc(select=[a, b, CAST('1' AS VARCHAR(2147483647)) AS c], 
where=[(c = '1')])\n"
+                        + "+- TableSourceScan(table=[[PAIMON, default, T, 
filter=[and(OR(=(a, 1), =(a, 2)), =(c, _UTF-16LE'1':VARCHAR(2147483647) 
CHARACTER SET \"UTF-16LE\"))]]], fields=[a, b, c])",
+                Row.ofKind(RowKind.INSERT, 1, 1, "1"));
+    }
+
+    @Test
+    public void testPartitionConditionNotConsuming1() {
+        // a = 1 not consumed
+        String sql = "SELECT * FROM T where a + 1 = 2 limit 1";
+        assertPlanAndResult(
+                sql,
+                "+- Calc(select=[a, b, c], where=[((a + 1) = 2)])\n"
+                        + "+- TableSourceScan(table=[[PAIMON, default, T, 
filter=[=(+(a, 1), 2)]]], fields=[a, b, c])",
+                Row.ofKind(RowKind.INSERT, 1, 1, "1"));
+    }
+
+    @Test
+    public void testPartitionConditionNotConsuming2() {
+        // UNIX_TIMESTAMP() > 0 not consumed
+        String sql = "SELECT * FROM T where UNIX_TIMESTAMP() > 0";
+        assertPlanAndResult(
+                sql,
+                "Calc(select=[a, b, c], where=[(UNIX_TIMESTAMP() > 0)])\n"
+                        + "+- TableSourceScan(table=[[PAIMON, default, T, 
filter=[>(UNIX_TIMESTAMP(), 0)]]], fields=[a, b, c])",
+                Row.ofKind(RowKind.INSERT, 1, 1, "1"),
+                Row.ofKind(RowKind.INSERT, 1, 2, "2"),
+                Row.ofKind(RowKind.INSERT, 2, 3, "3"),
+                Row.ofKind(RowKind.INSERT, 3, 3, "3"));
+    }
+
+    @Test
+    public void testPartitionConditionNotConsuming3() {
+        // all not consumed
+        String sql = "SELECT * FROM T where b = 3 and ( a = 2 or c = '3')";
+        assertPlanAndResult(
+                sql,
+                "Calc(select=[a, CAST(3 AS INTEGER) AS b, c], where=[((b = 3) 
AND ((a = 2) OR (c = '3')))])\n"
+                        + "+- TableSourceScan(table=[[PAIMON, default, T, 
filter=[and(=(b, 3), OR(=(a, 2), =(c, _UTF-16LE'3':VARCHAR(2147483647) 
CHARACTER SET \"UTF-16LE\")))]]], fields=[a, b, c])",
+                Row.ofKind(RowKind.INSERT, 2, 3, "3"),
+                Row.ofKind(RowKind.INSERT, 3, 3, "3"));
+    }
+
+    @Test
+    public void testStreamingReadingNotConsumePartitionCondition() throws 
TimeoutException {
+        String sql = "SELECT * FROM T WHERE a = 5";
+        String plan = sEnv.explainSql(sql, ExplainFormat.TEXT);
+        Assertions.assertThat(plan)
+                .contains(
+                        "Calc(select=[CAST(5 AS INTEGER) AS a, b, c], 
where=[(a = 5)])\n"
+                                + "+- TableSourceScan(table=[[PAIMON, default, 
T, filter=[=(a, 5)]]], fields=[a, b, c])");
+
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(sql).collect());
+        sql("INSERT INTO T VALUES (5, 5, '5'), (6, 6, '6'), (5, 5, '5_1')");
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of(5, 5, "5"), Row.of(5, 5, 
"5_1"));
+    }
+
+    @Test
+    public void testPartitionCondition_ProjectionPushDown() {
+        String sql = "SELECT b, a FROM T where a = 1 limit 1";
+        assertPlanAndResult(
+                sql,
+                "+- Limit(offset=[0], fetch=[1], global=[false])\n"
+                        + "+- TableSourceScan(table=[[PAIMON, default, T, 
filter=[=(a, 1)], project=[b], limit=[1]]], fields=[b])",
+                Row.ofKind(RowKind.INSERT, 1, 1));
+    }
+
+    private void assertPlanAndResult(String sql, String planIdentifier, Row... 
expectedRows) {
+        String plan = tEnv.explainSql(sql, ExplainFormat.TEXT);
+        String[] lines = plan.split("\n");
+        String trimmed = 
Arrays.stream(lines).map(String::trim).collect(Collectors.joining("\n"));
+        Assertions.assertThat(trimmed).contains(planIdentifier);
+        List<Row> result = batchSql(sql);
+        Assertions.assertThat(result).containsExactlyInAnyOrder(expectedRows);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
new file mode 100644
index 000000000..350fe9e3a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
@@ -0,0 +1,219 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+/** Test for {@link FlinkTableSource}. */
+public class FlinkTableSourceTest extends TableTestBase {
+
+    @Test
+    public void testApplyFilterNonPartitionTable() throws Exception {
+        FileIO fileIO = LocalFileIO.create();
+        Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
+        Schema schema = Schema.newBuilder().column("col1", 
DataTypes.INT()).build();
+        TableSchema tableSchema = new SchemaManager(fileIO, 
tablePath).createTable(schema);
+        Table table = FileStoreTableFactory.create(LocalFileIO.create(), 
tablePath, tableSchema);
+        FlinkTableSource tableSource =
+                new DataTableSource(
+                        ObjectIdentifier.of("catalog1", "db1", "T"), table, 
false, null, null);
+
+        // col1 = 1
+        List<ResolvedExpression> filters = ImmutableList.of(col1Equal1());
+        
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+    }
+
+    @Test
+    public void testApplyPartitionTable() throws Exception {
+        FileIO fileIO = LocalFileIO.create();
+        Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
+        Schema schema =
+                Schema.newBuilder()
+                        .column("col1", DataTypes.INT())
+                        .column("col2", DataTypes.INT())
+                        .column("p1", DataTypes.INT())
+                        .column("p2", DataTypes.STRING())
+                        .partitionKeys("p1", "p2")
+                        .build();
+        TableSchema tableSchema = new SchemaManager(fileIO, 
tablePath).createTable(schema);
+        Table table = FileStoreTableFactory.create(LocalFileIO.create(), 
tablePath, tableSchema);
+        FlinkTableSource tableSource =
+                new DataTableSource(
+                        ObjectIdentifier.of("catalog1", "db1", "T"), table, 
false, null, null);
+
+        // col1 = 1 && p1 = 1 => [p1 = 1]
+        List<ResolvedExpression> filters = ImmutableList.of(col1Equal1(), 
p1Equal1());
+        Assertions.assertThat(tableSource.pushFilters(filters))
+                .isEqualTo(ImmutableList.of(filters.get(0)));
+
+        // col1 = 1 && p2 like '%a' => [p2 like '%a']
+        filters = ImmutableList.of(p2Like("%a"));
+        
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+
+        // col1 = 1 && p2 like 'a%' => None
+        filters = ImmutableList.of(col1Equal1(), p2Like("a%"));
+        Assertions.assertThat(tableSource.pushFilters(filters))
+                .isEqualTo(ImmutableList.of(filters.get(0)));
+
+        // rand(42) > 0.1 => None
+        filters = ImmutableList.of(rand());
+        
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+
+        // upper(p1) = "A"
+        filters = ImmutableList.of(upperP2EqualA());
+        
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+
+        // col1 = 1 && (p2 like 'a%' or p1 = 1) => [p2 like 'a%' or p1 = 1]
+        filters = ImmutableList.of(col1Equal1(), or(p2Like("a%"), p1Equal1()));
+        Assertions.assertThat(tableSource.pushFilters(filters))
+                .isEqualTo(ImmutableList.of(filters.get(0)));
+
+        // col1 = 1 && (p2 like '%a' or p1 = 1) => None
+        filters = ImmutableList.of(col1Equal1(), or(p2Like("%a"), p1Equal1()));
+        Assertions.assertThat(tableSource.pushFilters(filters))
+                .containsExactlyInAnyOrder(filters.toArray(new 
ResolvedExpression[0]));
+
+        // col1 = 1 && (p2 like 'a%' && p1 = 1) => [p2 like 'a%' && p1 = 1]
+        filters = ImmutableList.of(col1Equal1(), and(p2Like("a%"), 
p1Equal1()));
+        Assertions.assertThat(tableSource.pushFilters(filters))
+                .isEqualTo(ImmutableList.of(filters.get(0)));
+
+        // col1 = 1 && (p2 like '%a' && p1 = 1) => None
+        filters = ImmutableList.of(col1Equal1(), and(p2Like("%a"), 
p1Equal1()));
+        Assertions.assertThat(tableSource.pushFilters(filters))
+                .containsExactlyInAnyOrder(filters.toArray(new 
ResolvedExpression[0]));
+
+        // p2 like 'a%' && (col1 = 1 or p1 = 1) => [col1 = 1 or p1 = 1]
+        filters = ImmutableList.of(p2Like("a%"), or(col1Equal1(), p1Equal1()));
+        Assertions.assertThat(tableSource.pushFilters(filters))
+                .isEqualTo(ImmutableList.of(filters.get(1)));
+
+        // p2 like 'a%' && (col1 = 1 && p1 = 1) => [col1 = 1 && p1 = 1]
+        filters = ImmutableList.of(p2Like("a%"), and(col1Equal1(), 
p1Equal1()));
+        Assertions.assertThat(tableSource.pushFilters(filters))
+                .isEqualTo(ImmutableList.of(filters.get(1)));
+    }
+
+    private ResolvedExpression col1Equal1() {
+        return CallExpression.anonymous(
+                BuiltInFunctionDefinitions.EQUALS,
+                ImmutableList.of(
+                        new FieldReferenceExpression(
+                                "col1", 
org.apache.flink.table.api.DataTypes.INT(), 0, 0),
+                        new ValueLiteralExpression(
+                                1, 
org.apache.flink.table.api.DataTypes.INT().notNull())),
+                org.apache.flink.table.api.DataTypes.BOOLEAN());
+    }
+
+    private ResolvedExpression p1Equal1() {
+        return CallExpression.anonymous(
+                BuiltInFunctionDefinitions.EQUALS,
+                ImmutableList.of(
+                        new FieldReferenceExpression(
+                                "p1", 
org.apache.flink.table.api.DataTypes.INT(), 0, 2),
+                        new ValueLiteralExpression(
+                                1, 
org.apache.flink.table.api.DataTypes.INT().notNull())),
+                org.apache.flink.table.api.DataTypes.BOOLEAN());
+    }
+
+    private ResolvedExpression p2Like(String literal) {
+        return CallExpression.anonymous(
+                BuiltInFunctionDefinitions.LIKE,
+                ImmutableList.of(
+                        new FieldReferenceExpression(
+                                "p2", 
org.apache.flink.table.api.DataTypes.STRING(), 0, 3),
+                        new ValueLiteralExpression(
+                                literal, 
org.apache.flink.table.api.DataTypes.STRING().notNull())),
+                org.apache.flink.table.api.DataTypes.BOOLEAN());
+    }
+
+    // where rand(42) > 0.1
+    private ResolvedExpression rand() {
+        return CallExpression.anonymous(
+                BuiltInFunctionDefinitions.GREATER_THAN,
+                ImmutableList.of(
+                        CallExpression.anonymous(
+                                BuiltInFunctionDefinitions.RAND,
+                                ImmutableList.of(
+                                        new ValueLiteralExpression(
+                                                42,
+                                                
org.apache.flink.table.api.DataTypes.INT()
+                                                        .notNull())),
+                                
org.apache.flink.table.api.DataTypes.DOUBLE().notNull()),
+                        new ValueLiteralExpression(
+                                0.1, 
org.apache.flink.table.api.DataTypes.DOUBLE().notNull())),
+                org.apache.flink.table.api.DataTypes.BOOLEAN());
+    }
+
+    private ResolvedExpression upperP2EqualA() {
+        return CallExpression.anonymous(
+                BuiltInFunctionDefinitions.EQUALS,
+                ImmutableList.of(
+                        CallExpression.anonymous(
+                                BuiltInFunctionDefinitions.UPPER,
+                                ImmutableList.of(
+                                        new FieldReferenceExpression(
+                                                "p2",
+                                                
org.apache.flink.table.api.DataTypes.STRING(),
+                                                0,
+                                                3)),
+                                
org.apache.flink.table.api.DataTypes.STRING().notNull()),
+                        new ValueLiteralExpression(
+                                "A", 
org.apache.flink.table.api.DataTypes.STRING().notNull())),
+                org.apache.flink.table.api.DataTypes.BOOLEAN());
+    }
+
+    private ResolvedExpression or(ResolvedExpression e1, ResolvedExpression 
e2) {
+        return CallExpression.anonymous(
+                BuiltInFunctionDefinitions.OR,
+                ImmutableList.of(e1, e2),
+                org.apache.flink.table.api.DataTypes.BOOLEAN());
+    }
+
+    private ResolvedExpression and(ResolvedExpression e1, ResolvedExpression 
e2) {
+        return CallExpression.anonymous(
+                BuiltInFunctionDefinitions.AND,
+                ImmutableList.of(e1, e2),
+                org.apache.flink.table.api.DataTypes.BOOLEAN());
+    }
+}

Reply via email to