This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 50cd9ff0e [flink] Always return all filters to flink when trying to 
apply filters in FlinkTableSource (#1934)
50cd9ff0e is described below

commit 50cd9ff0e1af1af077fcf7d843c63917851490ef
Author: Xuyang <[email protected]>
AuthorDate: Wed Dec 24 17:06:09 2025 +0800

    [flink] Always return all filters to flink when trying to apply filters in 
FlinkTableSource (#1934)
---
 .../fluss/flink/source/FlinkTableSource.java       | 12 ++-
 .../flink/source/FlinkTableSourceBatchITCase.java  | 96 ++++++++++++++++++++--
 .../fluss/flink/source/FlinkTableSourceITCase.java | 62 +++++++++++---
 3 files changed, 146 insertions(+), 24 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 2394921f1..6acdbf840 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -536,7 +536,11 @@ public class FlinkTableSource
                 return Result.of(Collections.emptyList(), filters);
             }
             singleRowFilter = lookupRow;
-            return Result.of(acceptedFilters, remainingFilters);
+
+            // FLINK-38635 We cannot determine whether this source will 
ultimately be used as a scan
+            // source or a lookup source. Since fluss lookup sources cannot 
accept filters yet, to
+            // be safe, we return all filters to the Flink planner.
+            return Result.of(acceptedFilters, filters);
         } else if (isPartitioned()) {
             // apply partition filter pushdown
             List<Predicate> converted = new ArrayList<>();
@@ -588,7 +592,11 @@ public class FlinkTableSource
                     }
                 }
             }
-            return Result.of(acceptedFilters, remainingFilters);
+
+            // FLINK-38635 We cannot determine whether this source will 
ultimately be used as a scan
+            // source or a lookup source. Since fluss lookup sources cannot 
accept filters yet, to
+            // be safe, we return all filters to the Flink planner.
+            return Result.of(acceptedFilters, filters);
         }
 
         return Result.of(Collections.emptyList(), filters);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
index 7f3894f96..8c0adb3a2 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
@@ -91,8 +91,8 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
                 .contains(
                         String.format(
                                 "TableSourceScan(table=[[testcatalog, 
defaultdb, %s, "
-                                        + "filter=[and(=(id, 1), =(name, 
_UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], "
-                                        + "project=[address]]], 
fields=[address])",
+                                        + "filter=[and(=(id, 1), =(name, 
_UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], "
+                                        + "fields=[id, address, name])",
                                 tableName));
         CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
         List<String> expected = Collections.singletonList("+I[1, address1, 
name1]");
@@ -108,8 +108,8 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
                 .contains(
                         String.format(
                                 "TableSourceScan(table=[[testcatalog, 
defaultdb, %s, "
-                                        + "filter=[and(=(id, 1), =(name, 
_UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], "
-                                        + "project=[address]]], 
fields=[address])",
+                                        + "filter=[and(=(id, 1), =(name, 
_UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], "
+                                        + "fields=[id, address, name])",
                                 tableName));
         CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
         List<String> expected = Collections.singletonList("+I[1, address1, 
name1]");
@@ -126,7 +126,7 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
                         String.format(
                                 "TableSourceScan(table=[[testcatalog, 
defaultdb, %s, "
                                         + "filter=[=(id, 1)], "
-                                        + "project=[name]]], fields=[name])",
+                                        + "project=[id, name]]], fields=[id, 
name])",
                                 tableName));
         CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
         List<String> expected = Collections.singletonList("+I[1, name1]");
@@ -149,8 +149,8 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
                 .contains(
                         String.format(
                                 "TableSourceScan(table=[[testcatalog, 
defaultdb, %s, "
-                                        + "filter=[and(=(id, 1), =(dt, 
_UTF-16LE'%s':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], "
-                                        + "project=[address, name]]], 
fields=[address, name])\n",
+                                        + "filter=[and(=(id, 1), =(dt, 
_UTF-16LE'%s':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], "
+                                        + "fields=[id, address, name, dt])\n",
                                 tableName, partition1));
 
         CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
@@ -159,6 +159,84 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
         assertResultsIgnoreOrder(collected, expected, true);
     }
 
+    @Test
+    void testFilterOnLookupSource() throws Exception {
+        String srcTableName = String.format("test_src_table_%s", 
RandomUtils.nextInt());
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + "  id int not null,"
+                                + "  name varchar,"
+                                + "  dt varchar,"
+                                + "  dim_dt varchar,"
+                                + "  primary key (id, dt) NOT ENFORCED) 
partitioned by (dt)"
+                                + " with ("
+                                + "  'bucket.num' = '4', "
+                                + "  'table.auto-partition.enabled' = 'true',"
+                                + "  'table.auto-partition.time-unit' = 
'year')",
+                        srcTableName));
+
+        String dimTableName = String.format("test_dim_table_%s", 
RandomUtils.nextInt());
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + "  id int not null,"
+                                + "  address varchar,"
+                                + "  dt varchar,"
+                                + "  primary key (id, dt) NOT ENFORCED) 
partitioned by (dt)"
+                                + " with ("
+                                + "  'bucket.num' = '4', "
+                                + "  'table.auto-partition.enabled' = 'true',"
+                                + "  'table.auto-partition.time-unit' = 
'year')",
+                        dimTableName));
+
+        TablePath srcTablePath = TablePath.of(DEFAULT_DB, srcTableName);
+        Map<Long, String> partitionNameById =
+                
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), srcTablePath);
+        // just pick first partition to insert data
+        Iterator<String> partitionIterator =
+                partitionNameById.values().stream().sorted().iterator();
+        String partition1 = partitionIterator.next();
+
+        // prepare src table data
+        try (Table srcTable = conn.getTable(srcTablePath)) {
+            UpsertWriter upsertWriter = srcTable.newUpsert().createWriter();
+            for (int i = 1; i <= 2; i++) {
+                Object[] values = new Object[] {i, "name" + i, partition1, 
partition1};
+                upsertWriter.upsert(row(values));
+            }
+            upsertWriter.flush();
+        }
+
+        TablePath dimTablePath = TablePath.of(DEFAULT_DB, dimTableName);
+        // prepare dim table data
+        try (Table dimTable = conn.getTable(dimTablePath)) {
+            UpsertWriter upsertWriter = dimTable.newUpsert().createWriter();
+            for (int i = 1; i <= 2; i++) {
+                Object[] values = new Object[] {i, "address" + i, partition1};
+                upsertWriter.upsert(row(values));
+            }
+            upsertWriter.flush();
+        }
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TEMPORARY VIEW my_view AS "
+                                + "SELECT *, proctime() as proc from %s WHERE 
id = 1 AND dt = '%s'",
+                        srcTableName, partition1));
+
+        CloseableIterator<Row> collected =
+                tEnv.executeSql(
+                                String.format(
+                                        "SELECT src.id, src.name, h.id, 
h.address FROM my_view src "
+                                                + " LEFT JOIN %s FOR 
SYSTEM_TIME AS OF src.proc as h "
+                                                + " ON src.id = h.id and 
src.dim_dt = h.dt and h.dt <> '%s'",
+                                        dimTableName, partition1))
+                        .collect();
+        List<String> expected = Collections.singletonList("+I[1, name1, null, 
null]");
+        assertResultsIgnoreOrder(collected, expected, true);
+    }
+
     @Test
     void testScanSingleRowFilterException() throws Exception {
         String tableName = prepareSourceTable(new String[] {"id", "name"}, 
null);
@@ -356,8 +434,8 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
         }
 
         // prepare table data
-        try (Table dimTable = conn.getTable(tablePath)) {
-            UpsertWriter upsertWriter = dimTable.newUpsert().createWriter();
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter upsertWriter = table.newUpsert().createWriter();
             for (int i = 1; i <= 5; i++) {
                 Object[] values =
                         partition1 == null
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index d765421bc..4d056a12d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -744,6 +744,41 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         assertResultsIgnoreOrder(collected2, expected2, true);
     }
 
+    /**
+     * lookup table with one pk, two join condition and one of the join 
condition is constant value.
+     */
+    @Test
+    void testLookupWithFilterPushDown() throws Exception {
+        String dim =
+                prepareDimTableAndSourceTable(
+                        Caching.DISABLE_CACHE, false, new String[] {"id"}, 
null, "p_date");
+
+        Map<Long, String> partitionNameById =
+                waitUntilPartitions(
+                        FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
+                        TablePath.of(DEFAULT_DB, dim));
+
+        // pick the first partition to do filter
+        String filteredPartition = 
partitionNameById.values().stream().sorted().iterator().next();
+
+        String dimJoinQuery =
+                String.format(
+                        "SELECT a, h.id, h.name, h.address FROM src "
+                                + " LEFT JOIN %s FOR SYSTEM_TIME AS OF 
src.proc as h "
+                                + " ON src.a = h.id and src.p_date = h.p_date 
and h.p_date <> '%s'",
+                        dim, filteredPartition);
+
+        CloseableIterator<Row> collected = 
tEnv.executeSql(dimJoinQuery).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, null, null, null]",
+                        "+I[2, null, null, null]",
+                        "+I[3, null, null, null]",
+                        "+I[10, null, null, null]",
+                        "+I[1, null, null, null]");
+        assertResultsIgnoreOrder(collected, expected, true);
+    }
+
     /**
      * lookup table with one pk, 3 join condition on dim fields, 1st for 
variable non-pk, 2nd for
      * pk, 3rd for constant value.
@@ -959,8 +994,8 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         assertThat(plan)
                 .contains(
                         "TableSourceScan(table=[[testcatalog, defaultdb, 
partitioned_table, "
-                                + "filter=[=(c, 
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
-                                + "project=[a, b]]], fields=[a, b])");
+                                + "filter=[=(c, 
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]]], "
+                                + "fields=[a, b, c])");
 
         org.apache.flink.util.CloseableIterator<Row> rowIter =
                 tEnv.executeSql("select * from partitioned_table where c 
='2025'").collect();
@@ -1013,10 +1048,11 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                                 + "=(p_string, 
_UTF-16LE'hello':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")), "
                                 + "=(p_float, 1.25E1:FLOAT)), =(p_double, 
7.88E0:DOUBLE)), =(p_date, 2025-10-12)), "
                                 + "=(p_time, 12:55:00)), =(p_ts_ntz, 
2025-10-12 12:55:00.001:TIMESTAMP(6))), "
-                                + "=(p_ts_ltz, 1970-01-01 
00:00:04.001:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))), NOT(p_bool))], "
-                                + "project=[id, p_bool, p_int]]], fields=[id, 
p_bool, p_int])")
-                // all filter conditions should be pushed down
-                .doesNotContain("where=");
+                                + "=(p_ts_ltz, 1970-01-01 
00:00:04.001:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))), NOT(p_bool))]]], "
+                                + "fields=[id, p_bool, p_int, p_bigint, 
p_bytes, p_string, p_float, p_double, p_date, p_time, p_ts_ntz, p_ts_ltz])")
+                // although all filter conditions are pushed down into source, 
they are still
+                // retained in the plan
+                .contains("where=");
 
         List<String> expectedRowValues =
                 Collections.singletonList(
@@ -1050,8 +1086,8 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         assertThat(plan)
                 .contains(
                         "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_partitioned_table, "
-                                + "filter=[=(c, 
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
-                                + "project=[a, b, d]]], fields=[a, b, d])");
+                                + "filter=[=(c, 
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]]], "
+                                + "fields=[a, b, c, d])");
 
         // test partition key prefix match
         // This test requires dynamically discovering newly created 
partitions, so
@@ -1080,8 +1116,8 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                 .contains(
                         "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_partitioned_table, "
                                 + "filter=[and(=(c, 
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"), "
-                                + "=(d, _UTF-16LE'3':VARCHAR(2147483647) 
CHARACTER SET \"UTF-16LE\"))], "
-                                + "project=[a, b]]], fields=[a, b])");
+                                + "=(d, _UTF-16LE'3':VARCHAR(2147483647) 
CHARACTER SET \"UTF-16LE\"))]]], "
+                                + "fields=[a, b, c, d])");
 
         // test all partition key match
         rowIter =
@@ -1128,7 +1164,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                 .contains(
                         "TableSourceScan(table=[[testcatalog, defaultdb, 
combined_filters_table, "
                                 + "filter=[=(c, 
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
-                                + "project=[a, d]]], fields=[a, d])");
+                                + "project=[a, c, d]]], fields=[a, c, d])");
 
         // test column filter、partition filter and flink runtime filter
         org.apache.flink.util.CloseableIterator<Row> rowIter =
@@ -1145,7 +1181,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                 .contains(
                         "TableSourceScan(table=[[testcatalog, defaultdb, 
combined_filters_table, "
                                 + "filter=[=(c, 
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
-                                + "project=[a, d]]], fields=[a, d])");
+                                + "project=[a, c, d]]], fields=[a, c, d])");
 
         // test column filter、partition filter and flink runtime filter
         rowIter =
@@ -1366,7 +1402,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         String plan = tEnv.explainSql(query);
         assertThat(plan)
                 .contains(
-                        "Calc(select=[3 AS a, b, c, d], where=[((a = 3) AND 
LIKE(b, '%v3%'))])\n"
+                        "Calc(select=[3 AS a, b, c, d], where=[((a = 3) AND 
((c = '2026') OR LIKE(d, '%1%')) AND LIKE(b, '%v3%'))])\n"
                                 + "+- TableSourceScan(table=[[testcatalog, 
defaultdb, partitioned_table_complex, filter=[OR(=(c, _UTF-16LE'2026'), LIKE(d, 
_UTF-16LE'%1%'))]]], fields=[a, b, c, d])");
 
         org.apache.flink.util.CloseableIterator<Row> rowIter = 
tEnv.executeSql(query).collect();

Reply via email to