This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ae15844ab [flink] Enhance filter push down to consume partition filter
(#2346)
ae15844ab is described below
commit ae15844ab97c30532a16d7ef962f46afa3293b4c
Author: Aitozi <[email protected]>
AuthorDate: Tue Nov 21 11:40:30 2023 +0800
[flink] Enhance filter push down to consume partition filter (#2346)
---
.../paimon/operation/DefaultValueAssigner.java | 8 +-
.../paimon/flink/source/DataTableSource.java | 5 +
.../paimon/flink/source/FlinkTableSource.java | 58 +++++-
.../paimon/flink/source/SystemTableSource.java | 5 +
.../flink/source/table/PushedRichTableSource.java | 3 +-
.../flink/source/table/PushedTableSource.java | 3 +-
.../paimon/flink/source/FilterPushDownITCase.java | 149 ++++++++++++++
.../paimon/flink/source/FlinkTableSourceTest.java | 219 +++++++++++++++++++++
8 files changed, 439 insertions(+), 11 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
index 730aafe8c..794423648 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
@@ -133,17 +133,17 @@ public class DefaultValueAssigner {
return Optional.of(predicate);
};
- ArrayList<Predicate> filterWithouDefaultValueField = new
ArrayList<>();
+ ArrayList<Predicate> filterWithoutDefaultValueField = new
ArrayList<>();
List<Predicate> predicates =
PredicateBuilder.splitAnd(filters);
for (Predicate predicate : predicates) {
predicate
.visit(deletePredicateWithFieldNameVisitor)
- .ifPresent(filterWithouDefaultValueField::add);
+ .ifPresent(filterWithoutDefaultValueField::add);
}
- if (!filterWithouDefaultValueField.isEmpty()) {
- result =
PredicateBuilder.and(filterWithouDefaultValueField);
+ if (!filterWithoutDefaultValueField.isEmpty()) {
+ result =
PredicateBuilder.and(filterWithoutDefaultValueField);
} else {
result = null;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index cb26a76a8..208691477 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -344,6 +344,11 @@ public class DataTableSource extends FlinkTableSource {
this.dynamicPartitionFilteringFields = candidateFilterFields;
}
+ @Override
+ public boolean isStreaming() {
+ return streaming;
+ }
+
/** Split statistics for inferring row count and parallelism size. */
protected static class SplitStatistics {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index ac2ef6ce9..c35b46444 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -20,8 +20,11 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.table.Table;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -35,15 +38,20 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
/** A Flink {@link ScanTableSource} for paimon. */
public abstract class FlinkTableSource {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkTableSource.class);
+
protected final Table table;
@Nullable protected Predicate predicate;
@@ -65,13 +73,55 @@ public abstract class FlinkTableSource {
this.limit = limit;
}
- public void pushFilters(List<ResolvedExpression> filters) {
- List<Predicate> converted = new ArrayList<>();
+ /** @return The unconsumed filters. */
+ public List<ResolvedExpression> pushFilters(List<ResolvedExpression>
filters) {
+ List<String> partitionKeys = table.partitionKeys();
RowType rowType = LogicalTypeConversion.toLogicalType(table.rowType());
+
+ // The source must ensure the consumed filters are fully evaluated,
otherwise the result
+ // of query will be wrong.
+ List<ResolvedExpression> unConsumedFilters = new ArrayList<>();
+ List<ResolvedExpression> consumedFilters = new ArrayList<>();
+ List<Predicate> converted = new ArrayList<>();
+ PredicateVisitor<Boolean> visitor =
+ new PredicateVisitor<Boolean>() {
+ @Override
+ public Boolean visit(LeafPredicate predicate) {
+ return partitionKeys.contains(predicate.fieldName());
+ }
+
+ @Override
+ public Boolean visit(CompoundPredicate predicate) {
+ for (Predicate child : predicate.children()) {
+ Boolean matched = child.visit(this);
+
+ if (!matched) {
+ return false;
+ }
+ }
+ return true;
+ }
+ };
+
for (ResolvedExpression filter : filters) {
- PredicateConverter.convert(rowType,
filter).ifPresent(converted::add);
+ Optional<Predicate> predicateOptional =
PredicateConverter.convert(rowType, filter);
+
+ if (!predicateOptional.isPresent()) {
+ unConsumedFilters.add(filter);
+ } else {
+ Predicate p = predicateOptional.get();
+ if (isStreaming() || !p.visit(visitor)) {
+ unConsumedFilters.add(filter);
+ } else {
+ consumedFilters.add(filter);
+ }
+ converted.add(p);
+ }
}
predicate = converted.isEmpty() ? null :
PredicateBuilder.and(converted);
+ LOG.info("Consumed filters: {} of {}", consumedFilters, filters);
+
+ return unConsumedFilters;
}
public void pushProjection(int[][] projectedFields) {
@@ -99,4 +149,6 @@ public abstract class FlinkTableSource {
public abstract List<String> listAcceptedFilterFields();
public abstract void applyDynamicFiltering(List<String>
candidateFilterFields);
+
+ public abstract boolean isStreaming();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index b6cc5d85d..b577a73aa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -134,4 +134,9 @@ public class SystemTableSource extends FlinkTableSource {
"Cannot apply dynamic filtering to Paimon system table
'%s'.",
table.name()));
}
+
+ @Override
+ public boolean isStreaming() {
+ return isStreamingMode;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
index 9d5083dda..7e55b83f6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
@@ -45,8 +45,7 @@ public class PushedRichTableSource extends RichTableSource
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
- source.pushFilters(filters);
- return Result.of(filters, filters);
+ return Result.of(filters, source.pushFilters(filters));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
index c5bb17acf..a1389b5bf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
@@ -45,8 +45,7 @@ public class PushedTableSource extends BaseTableSource
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
- source.pushFilters(filters);
- return Result.of(filters, filters);
+ return Result.of(filters, source.pushFilters(filters));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java
new file mode 100644
index 000000000..03c918ebc
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.ExplainFormat;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for filter push down. */
+public class FilterPushDownITCase extends CatalogITCaseBase {
+
+ @Override
+ public List<String> ddl() {
+ return ImmutableList.of("CREATE TABLE T (" + "a INT, b INT, c STRING)
PARTITIONED BY (a);");
+ }
+
+ @BeforeEach
+ @Override
+ public void before() throws IOException {
+ super.before();
+ batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2'), (2, 3, '3'),
(3, 3, '3')");
+ }
+
+ @Test
+ public void testPartitionConditionConsuming_OnePartitionCondition() {
+ String sql = "SELECT * FROM T where a = 1 limit 1";
+ assertPlanAndResult(
+ sql,
+ "+- Limit(offset=[0], fetch=[1], global=[false])\n"
+ + "+- TableSourceScan(table=[[PAIMON, default, T,
filter=[=(a, 1)], project=[b, c], limit=[1]]], fields=[b, c])",
+ Row.ofKind(RowKind.INSERT, 1, 1, "1"));
+ }
+
+ @Test
+ public void testPartitionConditionConsuming_PartitionConditionAndOther() {
+ String sql = "SELECT * FROM T where (a = 1 or a = 2) and c = '1' limit
1";
+ // c = '1' is not consumed and limit 1 not push to source
+ assertPlanAndResult(
+ sql,
+ "+- Calc(select=[a, b, CAST('1' AS VARCHAR(2147483647)) AS c],
where=[(c = '1')])\n"
+ + "+- TableSourceScan(table=[[PAIMON, default, T,
filter=[and(OR(=(a, 1), =(a, 2)), =(c, _UTF-16LE'1':VARCHAR(2147483647)
CHARACTER SET \"UTF-16LE\"))]]], fields=[a, b, c])",
+ Row.ofKind(RowKind.INSERT, 1, 1, "1"));
+ }
+
+ @Test
+ public void testPartitionConditionNotConsuming1() {
+ // a = 1 not consumed
+ String sql = "SELECT * FROM T where a + 1 = 2 limit 1";
+ assertPlanAndResult(
+ sql,
+ "+- Calc(select=[a, b, c], where=[((a + 1) = 2)])\n"
+ + "+- TableSourceScan(table=[[PAIMON, default, T,
filter=[=(+(a, 1), 2)]]], fields=[a, b, c])",
+ Row.ofKind(RowKind.INSERT, 1, 1, "1"));
+ }
+
+ @Test
+ public void testPartitionConditionNotConsuming2() {
+ // UNIX_TIMESTAMP() > 0 not consumed
+ String sql = "SELECT * FROM T where UNIX_TIMESTAMP() > 0";
+ assertPlanAndResult(
+ sql,
+ "Calc(select=[a, b, c], where=[(UNIX_TIMESTAMP() > 0)])\n"
+ + "+- TableSourceScan(table=[[PAIMON, default, T,
filter=[>(UNIX_TIMESTAMP(), 0)]]], fields=[a, b, c])",
+ Row.ofKind(RowKind.INSERT, 1, 1, "1"),
+ Row.ofKind(RowKind.INSERT, 1, 2, "2"),
+ Row.ofKind(RowKind.INSERT, 2, 3, "3"),
+ Row.ofKind(RowKind.INSERT, 3, 3, "3"));
+ }
+
+ @Test
+ public void testPartitionConditionNotConsuming3() {
+ // all not consumed
+ String sql = "SELECT * FROM T where b = 3 and ( a = 2 or c = '3')";
+ assertPlanAndResult(
+ sql,
+ "Calc(select=[a, CAST(3 AS INTEGER) AS b, c], where=[((b = 3)
AND ((a = 2) OR (c = '3')))])\n"
+ + "+- TableSourceScan(table=[[PAIMON, default, T,
filter=[and(=(b, 3), OR(=(a, 2), =(c, _UTF-16LE'3':VARCHAR(2147483647)
CHARACTER SET \"UTF-16LE\")))]]], fields=[a, b, c])",
+ Row.ofKind(RowKind.INSERT, 2, 3, "3"),
+ Row.ofKind(RowKind.INSERT, 3, 3, "3"));
+ }
+
+ @Test
+ public void testStreamingReadingNotConsumePartitionCondition() throws
TimeoutException {
+ String sql = "SELECT * FROM T WHERE a = 5";
+ String plan = sEnv.explainSql(sql, ExplainFormat.TEXT);
+ Assertions.assertThat(plan)
+ .contains(
+ "Calc(select=[CAST(5 AS INTEGER) AS a, b, c],
where=[(a = 5)])\n"
+ + "+- TableSourceScan(table=[[PAIMON, default,
T, filter=[=(a, 5)]]], fields=[a, b, c])");
+
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(sql).collect());
+ sql("INSERT INTO T VALUES (5, 5, '5'), (6, 6, '6'), (5, 5, '5_1')");
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of(5, 5, "5"), Row.of(5, 5,
"5_1"));
+ }
+
+ @Test
+ public void testPartitionCondition_ProjectionPushDown() {
+ String sql = "SELECT b, a FROM T where a = 1 limit 1";
+ assertPlanAndResult(
+ sql,
+ "+- Limit(offset=[0], fetch=[1], global=[false])\n"
+ + "+- TableSourceScan(table=[[PAIMON, default, T,
filter=[=(a, 1)], project=[b], limit=[1]]], fields=[b])",
+ Row.ofKind(RowKind.INSERT, 1, 1));
+ }
+
+ private void assertPlanAndResult(String sql, String planIdentifier, Row...
expectedRows) {
+ String plan = tEnv.explainSql(sql, ExplainFormat.TEXT);
+ String[] lines = plan.split("\n");
+ String trimmed =
Arrays.stream(lines).map(String::trim).collect(Collectors.joining("\n"));
+ Assertions.assertThat(trimmed).contains(planIdentifier);
+ List<Row> result = batchSql(sql);
+ Assertions.assertThat(result).containsExactlyInAnyOrder(expectedRows);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
new file mode 100644
index 000000000..350fe9e3a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
@@ -0,0 +1,219 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+/** Test for {@link FlinkTableSource}. */
+public class FlinkTableSourceTest extends TableTestBase {
+
+ @Test
+ public void testApplyFilterNonPartitionTable() throws Exception {
+ FileIO fileIO = LocalFileIO.create();
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
+ Schema schema = Schema.newBuilder().column("col1",
DataTypes.INT()).build();
+ TableSchema tableSchema = new SchemaManager(fileIO,
tablePath).createTable(schema);
+ Table table = FileStoreTableFactory.create(LocalFileIO.create(),
tablePath, tableSchema);
+ FlinkTableSource tableSource =
+ new DataTableSource(
+ ObjectIdentifier.of("catalog1", "db1", "T"), table,
false, null, null);
+
+ // col1 = 1
+ List<ResolvedExpression> filters = ImmutableList.of(col1Equal1());
+
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+ }
+
+ @Test
+ public void testApplyPartitionTable() throws Exception {
+ FileIO fileIO = LocalFileIO.create();
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
+ Schema schema =
+ Schema.newBuilder()
+ .column("col1", DataTypes.INT())
+ .column("col2", DataTypes.INT())
+ .column("p1", DataTypes.INT())
+ .column("p2", DataTypes.STRING())
+ .partitionKeys("p1", "p2")
+ .build();
+ TableSchema tableSchema = new SchemaManager(fileIO,
tablePath).createTable(schema);
+ Table table = FileStoreTableFactory.create(LocalFileIO.create(),
tablePath, tableSchema);
+ FlinkTableSource tableSource =
+ new DataTableSource(
+ ObjectIdentifier.of("catalog1", "db1", "T"), table,
false, null, null);
+
+ // col1 = 1 && p1 = 1 => [p1 = 1]
+ List<ResolvedExpression> filters = ImmutableList.of(col1Equal1(),
p1Equal1());
+ Assertions.assertThat(tableSource.pushFilters(filters))
+ .isEqualTo(ImmutableList.of(filters.get(0)));
+
+ // col1 = 1 && p2 like '%a' => [p2 like '%a']
+ filters = ImmutableList.of(p2Like("%a"));
+
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+
+ // col1 = 1 && p2 like 'a%' => None
+ filters = ImmutableList.of(col1Equal1(), p2Like("a%"));
+ Assertions.assertThat(tableSource.pushFilters(filters))
+ .isEqualTo(ImmutableList.of(filters.get(0)));
+
+ // rand(42) > 0.1 => None
+ filters = ImmutableList.of(rand());
+
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+
+ // upper(p1) = "A"
+ filters = ImmutableList.of(upperP2EqualA());
+
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+
+ // col1 = 1 && (p2 like 'a%' or p1 = 1) => [p2 like 'a%' or p1 = 1]
+ filters = ImmutableList.of(col1Equal1(), or(p2Like("a%"), p1Equal1()));
+ Assertions.assertThat(tableSource.pushFilters(filters))
+ .isEqualTo(ImmutableList.of(filters.get(0)));
+
+ // col1 = 1 && (p2 like '%a' or p1 = 1) => None
+ filters = ImmutableList.of(col1Equal1(), or(p2Like("%a"), p1Equal1()));
+ Assertions.assertThat(tableSource.pushFilters(filters))
+ .containsExactlyInAnyOrder(filters.toArray(new
ResolvedExpression[0]));
+
+ // col1 = 1 && (p2 like 'a%' && p1 = 1) => [p2 like 'a%' && p1 = 1]
+ filters = ImmutableList.of(col1Equal1(), and(p2Like("a%"),
p1Equal1()));
+ Assertions.assertThat(tableSource.pushFilters(filters))
+ .isEqualTo(ImmutableList.of(filters.get(0)));
+
+ // col1 = 1 && (p2 like '%a' && p1 = 1) => None
+ filters = ImmutableList.of(col1Equal1(), and(p2Like("%a"),
p1Equal1()));
+ Assertions.assertThat(tableSource.pushFilters(filters))
+ .containsExactlyInAnyOrder(filters.toArray(new
ResolvedExpression[0]));
+
+ // p2 like 'a%' && (col1 = 1 or p1 = 1) => [col1 = 1 or p1 = 1]
+ filters = ImmutableList.of(p2Like("a%"), or(col1Equal1(), p1Equal1()));
+ Assertions.assertThat(tableSource.pushFilters(filters))
+ .isEqualTo(ImmutableList.of(filters.get(1)));
+
+ // p2 like 'a%' && (col1 = 1 && p1 = 1) => [col1 = 1 && p1 = 1]
+ filters = ImmutableList.of(p2Like("a%"), and(col1Equal1(),
p1Equal1()));
+ Assertions.assertThat(tableSource.pushFilters(filters))
+ .isEqualTo(ImmutableList.of(filters.get(1)));
+ }
+
+ private ResolvedExpression col1Equal1() {
+ return CallExpression.anonymous(
+ BuiltInFunctionDefinitions.EQUALS,
+ ImmutableList.of(
+ new FieldReferenceExpression(
+ "col1",
org.apache.flink.table.api.DataTypes.INT(), 0, 0),
+ new ValueLiteralExpression(
+ 1,
org.apache.flink.table.api.DataTypes.INT().notNull())),
+ org.apache.flink.table.api.DataTypes.BOOLEAN());
+ }
+
+ private ResolvedExpression p1Equal1() {
+ return CallExpression.anonymous(
+ BuiltInFunctionDefinitions.EQUALS,
+ ImmutableList.of(
+ new FieldReferenceExpression(
+ "p1",
org.apache.flink.table.api.DataTypes.INT(), 0, 2),
+ new ValueLiteralExpression(
+ 1,
org.apache.flink.table.api.DataTypes.INT().notNull())),
+ org.apache.flink.table.api.DataTypes.BOOLEAN());
+ }
+
+ private ResolvedExpression p2Like(String literal) {
+ return CallExpression.anonymous(
+ BuiltInFunctionDefinitions.LIKE,
+ ImmutableList.of(
+ new FieldReferenceExpression(
+ "p2",
org.apache.flink.table.api.DataTypes.STRING(), 0, 3),
+ new ValueLiteralExpression(
+ literal,
org.apache.flink.table.api.DataTypes.STRING().notNull())),
+ org.apache.flink.table.api.DataTypes.BOOLEAN());
+ }
+
+ // where rand(42) > 0.1
+ private ResolvedExpression rand() {
+ return CallExpression.anonymous(
+ BuiltInFunctionDefinitions.GREATER_THAN,
+ ImmutableList.of(
+ CallExpression.anonymous(
+ BuiltInFunctionDefinitions.RAND,
+ ImmutableList.of(
+ new ValueLiteralExpression(
+ 42,
+
org.apache.flink.table.api.DataTypes.INT()
+ .notNull())),
+
org.apache.flink.table.api.DataTypes.DOUBLE().notNull()),
+ new ValueLiteralExpression(
+ 0.1,
org.apache.flink.table.api.DataTypes.DOUBLE().notNull())),
+ org.apache.flink.table.api.DataTypes.BOOLEAN());
+ }
+
+ private ResolvedExpression upperP2EqualA() {
+ return CallExpression.anonymous(
+ BuiltInFunctionDefinitions.EQUALS,
+ ImmutableList.of(
+ CallExpression.anonymous(
+ BuiltInFunctionDefinitions.UPPER,
+ ImmutableList.of(
+ new FieldReferenceExpression(
+ "p2",
+
org.apache.flink.table.api.DataTypes.STRING(),
+ 0,
+ 3)),
+
org.apache.flink.table.api.DataTypes.STRING().notNull()),
+ new ValueLiteralExpression(
+ "A",
org.apache.flink.table.api.DataTypes.STRING().notNull())),
+ org.apache.flink.table.api.DataTypes.BOOLEAN());
+ }
+
+ private ResolvedExpression or(ResolvedExpression e1, ResolvedExpression
e2) {
+ return CallExpression.anonymous(
+ BuiltInFunctionDefinitions.OR,
+ ImmutableList.of(e1, e2),
+ org.apache.flink.table.api.DataTypes.BOOLEAN());
+ }
+
+ private ResolvedExpression and(ResolvedExpression e1, ResolvedExpression
e2) {
+ return CallExpression.anonymous(
+ BuiltInFunctionDefinitions.AND,
+ ImmutableList.of(e1, e2),
+ org.apache.flink.table.api.DataTypes.BOOLEAN());
+ }
+}