This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 50cd9ff0e [flink] Always return all filters to flink when trying to
apply filters in FlinkTableSource (#1934)
50cd9ff0e is described below
commit 50cd9ff0e1af1af077fcf7d843c63917851490ef
Author: Xuyang <[email protected]>
AuthorDate: Wed Dec 24 17:06:09 2025 +0800
[flink] Always return all filters to flink when trying to apply filters in
FlinkTableSource (#1934)
---
.../fluss/flink/source/FlinkTableSource.java | 12 ++-
.../flink/source/FlinkTableSourceBatchITCase.java | 96 ++++++++++++++++++++--
.../fluss/flink/source/FlinkTableSourceITCase.java | 62 +++++++++++---
3 files changed, 146 insertions(+), 24 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 2394921f1..6acdbf840 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -536,7 +536,11 @@ public class FlinkTableSource
return Result.of(Collections.emptyList(), filters);
}
singleRowFilter = lookupRow;
- return Result.of(acceptedFilters, remainingFilters);
+
+ // FLINK-38635 We cannot determine whether this source will
ultimately be used as a scan
+ // source or a lookup source. Since fluss lookup sources cannot
accept filters yet, to
+ // be safe, we return all filters to the Flink planner.
+ return Result.of(acceptedFilters, filters);
} else if (isPartitioned()) {
// apply partition filter pushdown
List<Predicate> converted = new ArrayList<>();
@@ -588,7 +592,11 @@ public class FlinkTableSource
}
}
}
- return Result.of(acceptedFilters, remainingFilters);
+
+ // FLINK-38635 We cannot determine whether this source will
ultimately be used as a scan
+ // source or a lookup source. Since fluss lookup sources cannot
accept filters yet, to
+ // be safe, we return all filters to the Flink planner.
+ return Result.of(acceptedFilters, filters);
}
return Result.of(Collections.emptyList(), filters);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
index 7f3894f96..8c0adb3a2 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
@@ -91,8 +91,8 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
.contains(
String.format(
"TableSourceScan(table=[[testcatalog,
defaultdb, %s, "
- + "filter=[and(=(id, 1), =(name,
_UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], "
- + "project=[address]]],
fields=[address])",
+ + "filter=[and(=(id, 1), =(name,
_UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], "
+ + "fields=[id, address, name])",
tableName));
CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
List<String> expected = Collections.singletonList("+I[1, address1,
name1]");
@@ -108,8 +108,8 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
.contains(
String.format(
"TableSourceScan(table=[[testcatalog,
defaultdb, %s, "
- + "filter=[and(=(id, 1), =(name,
_UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], "
- + "project=[address]]],
fields=[address])",
+ + "filter=[and(=(id, 1), =(name,
_UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], "
+ + "fields=[id, address, name])",
tableName));
CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
List<String> expected = Collections.singletonList("+I[1, address1,
name1]");
@@ -126,7 +126,7 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
String.format(
"TableSourceScan(table=[[testcatalog,
defaultdb, %s, "
+ "filter=[=(id, 1)], "
- + "project=[name]]], fields=[name])",
+ + "project=[id, name]]], fields=[id,
name])",
tableName));
CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
List<String> expected = Collections.singletonList("+I[1, name1]");
@@ -149,8 +149,8 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
.contains(
String.format(
"TableSourceScan(table=[[testcatalog,
defaultdb, %s, "
- + "filter=[and(=(id, 1), =(dt,
_UTF-16LE'%s':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], "
- + "project=[address, name]]],
fields=[address, name])\n",
+ + "filter=[and(=(id, 1), =(dt,
_UTF-16LE'%s':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], "
+ + "fields=[id, address, name, dt])\n",
tableName, partition1));
CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
@@ -159,6 +159,84 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
assertResultsIgnoreOrder(collected, expected, true);
}
+ @Test
+ void testFilterOnLookupSource() throws Exception {
+ String srcTableName = String.format("test_src_table_%s",
RandomUtils.nextInt());
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " id int not null,"
+ + " name varchar,"
+ + " dt varchar,"
+ + " dim_dt varchar,"
+ + " primary key (id, dt) NOT ENFORCED)
partitioned by (dt)"
+ + " with ("
+ + " 'bucket.num' = '4', "
+ + " 'table.auto-partition.enabled' = 'true',"
+ + " 'table.auto-partition.time-unit' =
'year')",
+ srcTableName));
+
+ String dimTableName = String.format("test_dim_table_%s",
RandomUtils.nextInt());
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " id int not null,"
+ + " address varchar,"
+ + " dt varchar,"
+ + " primary key (id, dt) NOT ENFORCED)
partitioned by (dt)"
+ + " with ("
+ + " 'bucket.num' = '4', "
+ + " 'table.auto-partition.enabled' = 'true',"
+ + " 'table.auto-partition.time-unit' =
'year')",
+ dimTableName));
+
+ TablePath srcTablePath = TablePath.of(DEFAULT_DB, srcTableName);
+ Map<Long, String> partitionNameById =
+
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), srcTablePath);
+ // just pick first partition to insert data
+ Iterator<String> partitionIterator =
+ partitionNameById.values().stream().sorted().iterator();
+ String partition1 = partitionIterator.next();
+
+ // prepare src table data
+ try (Table srcTable = conn.getTable(srcTablePath)) {
+ UpsertWriter upsertWriter = srcTable.newUpsert().createWriter();
+ for (int i = 1; i <= 2; i++) {
+ Object[] values = new Object[] {i, "name" + i, partition1,
partition1};
+ upsertWriter.upsert(row(values));
+ }
+ upsertWriter.flush();
+ }
+
+ TablePath dimTablePath = TablePath.of(DEFAULT_DB, dimTableName);
+ // prepare dim table data
+ try (Table dimTable = conn.getTable(dimTablePath)) {
+ UpsertWriter upsertWriter = dimTable.newUpsert().createWriter();
+ for (int i = 1; i <= 2; i++) {
+ Object[] values = new Object[] {i, "address" + i, partition1};
+ upsertWriter.upsert(row(values));
+ }
+ upsertWriter.flush();
+ }
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE TEMPORARY VIEW my_view AS "
+ + "SELECT *, proctime() as proc from %s WHERE
id = 1 AND dt = '%s'",
+ srcTableName, partition1));
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(
+ String.format(
+ "SELECT src.id, src.name, h.id,
h.address FROM my_view src "
+ + " LEFT JOIN %s FOR
SYSTEM_TIME AS OF src.proc as h "
+ + " ON src.id = h.id and
src.dim_dt = h.dt and h.dt <> '%s'",
+ dimTableName, partition1))
+ .collect();
+ List<String> expected = Collections.singletonList("+I[1, name1, null,
null]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
@Test
void testScanSingleRowFilterException() throws Exception {
String tableName = prepareSourceTable(new String[] {"id", "name"},
null);
@@ -356,8 +434,8 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
}
// prepare table data
- try (Table dimTable = conn.getTable(tablePath)) {
- UpsertWriter upsertWriter = dimTable.newUpsert().createWriter();
+ try (Table table = conn.getTable(tablePath)) {
+ UpsertWriter upsertWriter = table.newUpsert().createWriter();
for (int i = 1; i <= 5; i++) {
Object[] values =
partition1 == null
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index d765421bc..4d056a12d 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -744,6 +744,41 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
assertResultsIgnoreOrder(collected2, expected2, true);
}
+ /**
+ * lookup table with one pk, two join condition and one of the join
condition is constant value.
+ */
+ @Test
+ void testLookupWithFilterPushDown() throws Exception {
+ String dim =
+ prepareDimTableAndSourceTable(
+ Caching.DISABLE_CACHE, false, new String[] {"id"},
null, "p_date");
+
+ Map<Long, String> partitionNameById =
+ waitUntilPartitions(
+ FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
+ TablePath.of(DEFAULT_DB, dim));
+
+ // pick the first partition to do filter
+ String filteredPartition =
partitionNameById.values().stream().sorted().iterator().next();
+
+ String dimJoinQuery =
+ String.format(
+ "SELECT a, h.id, h.name, h.address FROM src "
+ + " LEFT JOIN %s FOR SYSTEM_TIME AS OF
src.proc as h "
+ + " ON src.a = h.id and src.p_date = h.p_date
and h.p_date <> '%s'",
+ dim, filteredPartition);
+
+ CloseableIterator<Row> collected =
tEnv.executeSql(dimJoinQuery).collect();
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, null, null, null]",
+ "+I[2, null, null, null]",
+ "+I[3, null, null, null]",
+ "+I[10, null, null, null]",
+ "+I[1, null, null, null]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
/**
* lookup table with one pk, 3 join condition on dim fields, 1st for
variable non-pk, 2nd for
* pk, 3rd for constant value.
@@ -959,8 +994,8 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
assertThat(plan)
.contains(
"TableSourceScan(table=[[testcatalog, defaultdb,
partitioned_table, "
- + "filter=[=(c,
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
- + "project=[a, b]]], fields=[a, b])");
+ + "filter=[=(c,
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]]], "
+ + "fields=[a, b, c])");
org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from partitioned_table where c
='2025'").collect();
@@ -1013,10 +1048,11 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
+ "=(p_string,
_UTF-16LE'hello':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")), "
+ "=(p_float, 1.25E1:FLOAT)), =(p_double,
7.88E0:DOUBLE)), =(p_date, 2025-10-12)), "
+ "=(p_time, 12:55:00)), =(p_ts_ntz,
2025-10-12 12:55:00.001:TIMESTAMP(6))), "
- + "=(p_ts_ltz, 1970-01-01
00:00:04.001:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))), NOT(p_bool))], "
- + "project=[id, p_bool, p_int]]], fields=[id,
p_bool, p_int])")
- // all filter conditions should be pushed down
- .doesNotContain("where=");
+ + "=(p_ts_ltz, 1970-01-01
00:00:04.001:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))), NOT(p_bool))]]], "
+ + "fields=[id, p_bool, p_int, p_bigint,
p_bytes, p_string, p_float, p_double, p_date, p_time, p_ts_ntz, p_ts_ltz])")
+ // although all filter conditions are pushed down into source,
they are still
+ // retained in the plan
+ .contains("where=");
List<String> expectedRowValues =
Collections.singletonList(
@@ -1050,8 +1086,8 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
assertThat(plan)
.contains(
"TableSourceScan(table=[[testcatalog, defaultdb,
multi_partitioned_table, "
- + "filter=[=(c,
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
- + "project=[a, b, d]]], fields=[a, b, d])");
+ + "filter=[=(c,
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]]], "
+ + "fields=[a, b, c, d])");
// test partition key prefix match
// This test requires dynamically discovering newly created
partitions, so
@@ -1080,8 +1116,8 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
.contains(
"TableSourceScan(table=[[testcatalog, defaultdb,
multi_partitioned_table, "
+ "filter=[and(=(c,
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"), "
- + "=(d, _UTF-16LE'3':VARCHAR(2147483647)
CHARACTER SET \"UTF-16LE\"))], "
- + "project=[a, b]]], fields=[a, b])");
+ + "=(d, _UTF-16LE'3':VARCHAR(2147483647)
CHARACTER SET \"UTF-16LE\"))]]], "
+ + "fields=[a, b, c, d])");
// test all partition key match
rowIter =
@@ -1128,7 +1164,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
.contains(
"TableSourceScan(table=[[testcatalog, defaultdb,
combined_filters_table, "
+ "filter=[=(c,
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
- + "project=[a, d]]], fields=[a, d])");
+ + "project=[a, c, d]]], fields=[a, c, d])");
// test column filter、partition filter and flink runtime filter
org.apache.flink.util.CloseableIterator<Row> rowIter =
@@ -1145,7 +1181,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
.contains(
"TableSourceScan(table=[[testcatalog, defaultdb,
combined_filters_table, "
+ "filter=[=(c,
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
- + "project=[a, d]]], fields=[a, d])");
+ + "project=[a, c, d]]], fields=[a, c, d])");
// test column filter、partition filter and flink runtime filter
rowIter =
@@ -1366,7 +1402,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
String plan = tEnv.explainSql(query);
assertThat(plan)
.contains(
- "Calc(select=[3 AS a, b, c, d], where=[((a = 3) AND
LIKE(b, '%v3%'))])\n"
+ "Calc(select=[3 AS a, b, c, d], where=[((a = 3) AND
((c = '2026') OR LIKE(d, '%1%')) AND LIKE(b, '%v3%'))])\n"
+ "+- TableSourceScan(table=[[testcatalog,
defaultdb, partitioned_table_complex, filter=[OR(=(c, _UTF-16LE'2026'), LIKE(d,
_UTF-16LE'%1%'))]]], fields=[a, b, c, d])");
org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql(query).collect();