Copilot commented on code in PR #2338:
URL: https://github.com/apache/fluss/pull/2338#discussion_r2684761230


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -325,6 +325,224 @@ void testTableProjectPushDown(String mode) throws 
Exception {
         assertThat(actual).containsExactlyElementsOf(expected);
     }
 
+    @Test
+    void testProjectPushDownWithComplexTypes() throws Exception {
+        // Test projection pushdown for tables with Array, Map, and Row types
+        // This test verifies support for issue #1979
+        tEnv.executeSql(
+                "create table complex_type_projection_test ("
+                        + "a bigint, "
+                        + "b string, "
+                        + "c map<string, int>, "
+                        + "d row<d1 int, d2 string>, "
+                        + "e array<string>, "
+                        + "f int"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO complex_type_projection_test VALUES "
+                                + "(1, 'value1', MAP['key1', 10], ROW(100, 
'row1'), ARRAY['a', 'b'], 1000), "
+                                + "(2, 'value2', MAP['key2', 20], ROW(200, 
'row2'), ARRAY['c', 'd'], 2000), "
+                                + "(3, 'value3', MAP['key3', 30], ROW(300, 
'row3'), ARRAY['e', 'f'], 3000)")
+                .await();
+
+        // Test projection: select a, c, d, f (including complex types)
+        String query = "select a, c, d, f from complex_type_projection_test";
+
+        // Verify projection pushdown in the execution plan
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
complex_type_projection_test, project=[a, c, d, f]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, {key1=10}, +I[100, row1], 1000]",
+                        "+I[2, {key2=20}, +I[200, row2], 2000]",
+                        "+I[3, {key3=30}, +I[300, row3], 3000]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithArrayTypes() throws Exception {
+        // Test projection focusing on array types
+        tEnv.executeSql(
+                "create table array_projection_test ("
+                        + "id int, "
+                        + "int_array array<int>, "
+                        + "string_array array<string>, "
+                        + "name string"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO array_projection_test VALUES "
+                                + "(1, ARRAY[1, 2, 3], ARRAY['a', 'b', 'c'], 
'name1'), "
+                                + "(2, ARRAY[4, 5], ARRAY['d', 'e'], 'name2'), 
"
+                                + "(3, ARRAY[6, 7, 8, 9], ARRAY['f'], 
'name3')")
+                .await();
+
+        // Test projection: select id, int_array, string_array
+        String query = "select id, int_array, string_array from 
array_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
array_projection_test, project=[id, int_array, string_array]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, [1, 2, 3], [a, b, c]]",
+                        "+I[2, [4, 5], [d, e]]",
+                        "+I[3, [6, 7, 8, 9], [f]]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithMapTypes() throws Exception {
+        // Test projection focusing on map types
+        tEnv.executeSql(
+                "create table map_projection_test ("
+                        + "id int, "
+                        + "map1 map<string, int>, "
+                        + "map2 map<int, string>, "
+                        + "description string"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO map_projection_test VALUES "
+                                + "(1, MAP['k1', 10, 'k2', 20], MAP[1, 'v1', 
2, 'v2'], 'desc1'), "
+                                + "(2, MAP['k3', 30], MAP[3, 'v3'], 'desc2'), "
+                                + "(3, MAP['k4', 40, 'k5', 50, 'k6', 60], 
MAP[4, 'v4'], 'desc3')")
+                .await();
+
+        // Test projection: select id, map1, map2
+        String query = "select id, map1, map2 from map_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
map_projection_test, project=[id, map1, map2]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, {k1=10, k2=20}, {1=v1, 2=v2}]",
+                        "+I[2, {k3=30}, {3=v3}]",
+                        "+I[3, {k4=40, k5=50, k6=60}, {4=v4}]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithRowTypes() throws Exception {
+        // Test projection focusing on row types
+        tEnv.executeSql(
+                "create table row_projection_test ("
+                        + "id int, "
+                        + "simple_row row<x int, y string>, "
+                        + "nested_row row<a int, b row<c int, d string>>, "
+                        + "val bigint"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO row_projection_test VALUES "
+                                + "(1, ROW(10, 'str1'), ROW(100, ROW(1000, 
'nested1')), 1000), "
+                                + "(2, ROW(20, 'str2'), ROW(200, ROW(2000, 
'nested2')), 2000), "
+                                + "(3, ROW(30, 'str3'), ROW(300, ROW(3000, 
'nested3')), 3000)")
+                .await();
+
+        // Test projection: select id, simple_row, nested_row
+        String query = "select id, simple_row, nested_row from 
row_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
row_projection_test, project=[id, simple_row, nested_row]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, +I[10, str1], +I[100, +I[1000, nested1]]]",
+                        "+I[2, +I[20, str2], +I[200, +I[2000, nested2]]]",
+                        "+I[3, +I[30, str3], +I[300, +I[3000, nested3]]]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithNullComplexTypes() throws Exception {
+        // Test projection with null complex type values
+        tEnv.executeSql(
+                "create table null_complex_test ("
+                        + "id int, "
+                        + "arr array<int>, "
+                        + "mp map<string, int>, "
+                        + "rw row<a int, b string>, "
+                        + "name string"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO null_complex_test VALUES "
+                                + "(1, ARRAY[1, 2], MAP['k1', 1], ROW(10, 
'test'), 'name1'), "
+                                + "(2, CAST(NULL AS ARRAY<INT>), MAP['k2', 2], 
ROW(20, 'test2'), 'name2'), "
+                                + "(3, ARRAY[3], CAST(NULL AS MAP<STRING, 
INT>), ROW(30, 'test3'), 'name3'), "
+                                + "(4, ARRAY[4], MAP['k4', 4], CAST(NULL AS 
ROW<a INT, b STRING>), 'name4')")
+                .await();
+
+        // Test projection: select id, arr, mp, rw
+        String query = "select id, arr, mp, rw from null_complex_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
null_complex_test, project=[id, arr, mp, rw]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, [1, 2], {k1=1}, +I[10, test]]",
+                        "+I[2, null, {k2=2}, +I[20, test2]]",
+                        "+I[3, [3], null, +I[30, test3]]",
+                        "+I[4, [4], {k4=4}, null]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithComplexTypesInLogTable() throws Exception {
+        // Test projection with complex types in log table
+        String tableName = "complex_log_table";
+
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + "id int, "
+                                + "tags array<string>, "
+                                + "metadata map<string, int>, "
+                                + "address row<city string, zipcode int>, "
+                                + "score double"
+                                + ") with ('bucket.num' = '3')",
+                        tableName));
+
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s VALUES "
+                                        + "(1, ARRAY['tag1', 'tag2'], 
MAP['age', 25], ROW('Beijing', 100000), 95.5), "
+                                        + "(2, ARRAY['tag3'], MAP['age', 30, 
'level', 5], ROW('Shanghai', 200000), 88.0)",
+                                tableName))
+                .await();
+
+        // Test projection: select id, tags, address
+        String query = String.format("select id, tags, address from %s", 
tableName);
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        String.format(
+                                "TableSourceScan(table=[[testcatalog, 
defaultdb, %s, project=[id, tags, address]]]",

Review Comment:
   The assertion for the execution plan should include the fields specification 
for consistency with other projection pushdown tests in this file. Other tests 
(e.g., testAppendTableProjectPushDown, testTableProjectPushDown) check for both 
project and fields in the TableSourceScan output.
   ```suggestion
                                   "TableSourceScan(table=[[testcatalog, 
defaultdb, %s, project=[id, tags, address], fields=[id, tags, metadata, 
address, score]]]",
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -325,6 +325,224 @@ void testTableProjectPushDown(String mode) throws 
Exception {
         assertThat(actual).containsExactlyElementsOf(expected);
     }
 
+    @Test
+    void testProjectPushDownWithComplexTypes() throws Exception {
+        // Test projection pushdown for tables with Array, Map, and Row types
+        // This test verifies support for issue #1979
+        tEnv.executeSql(
+                "create table complex_type_projection_test ("
+                        + "a bigint, "
+                        + "b string, "
+                        + "c map<string, int>, "
+                        + "d row<d1 int, d2 string>, "
+                        + "e array<string>, "
+                        + "f int"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO complex_type_projection_test VALUES "
+                                + "(1, 'value1', MAP['key1', 10], ROW(100, 
'row1'), ARRAY['a', 'b'], 1000), "
+                                + "(2, 'value2', MAP['key2', 20], ROW(200, 
'row2'), ARRAY['c', 'd'], 2000), "
+                                + "(3, 'value3', MAP['key3', 30], ROW(300, 
'row3'), ARRAY['e', 'f'], 3000)")
+                .await();
+
+        // Test projection: select a, c, d, f (including complex types)
+        String query = "select a, c, d, f from complex_type_projection_test";
+
+        // Verify projection pushdown in the execution plan
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
complex_type_projection_test, project=[a, c, d, f]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, {key1=10}, +I[100, row1], 1000]",
+                        "+I[2, {key2=20}, +I[200, row2], 2000]",
+                        "+I[3, {key3=30}, +I[300, row3], 3000]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithArrayTypes() throws Exception {
+        // Test projection focusing on array types
+        tEnv.executeSql(
+                "create table array_projection_test ("
+                        + "id int, "
+                        + "int_array array<int>, "
+                        + "string_array array<string>, "
+                        + "name string"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO array_projection_test VALUES "
+                                + "(1, ARRAY[1, 2, 3], ARRAY['a', 'b', 'c'], 
'name1'), "
+                                + "(2, ARRAY[4, 5], ARRAY['d', 'e'], 'name2'), 
"
+                                + "(3, ARRAY[6, 7, 8, 9], ARRAY['f'], 
'name3')")
+                .await();
+
+        // Test projection: select id, int_array, string_array
+        String query = "select id, int_array, string_array from 
array_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
array_projection_test, project=[id, int_array, string_array]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, [1, 2, 3], [a, b, c]]",
+                        "+I[2, [4, 5], [d, e]]",
+                        "+I[3, [6, 7, 8, 9], [f]]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithMapTypes() throws Exception {
+        // Test projection focusing on map types
+        tEnv.executeSql(
+                "create table map_projection_test ("
+                        + "id int, "
+                        + "map1 map<string, int>, "
+                        + "map2 map<int, string>, "
+                        + "description string"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO map_projection_test VALUES "
+                                + "(1, MAP['k1', 10, 'k2', 20], MAP[1, 'v1', 
2, 'v2'], 'desc1'), "
+                                + "(2, MAP['k3', 30], MAP[3, 'v3'], 'desc2'), "
+                                + "(3, MAP['k4', 40, 'k5', 50, 'k6', 60], 
MAP[4, 'v4'], 'desc3')")
+                .await();
+
+        // Test projection: select id, map1, map2
+        String query = "select id, map1, map2 from map_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
map_projection_test, project=[id, map1, map2]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, {k1=10, k2=20}, {1=v1, 2=v2}]",
+                        "+I[2, {k3=30}, {3=v3}]",
+                        "+I[3, {k4=40, k5=50, k6=60}, {4=v4}]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithRowTypes() throws Exception {
+        // Test projection focusing on row types
+        tEnv.executeSql(
+                "create table row_projection_test ("
+                        + "id int, "
+                        + "simple_row row<x int, y string>, "
+                        + "nested_row row<a int, b row<c int, d string>>, "
+                        + "val bigint"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO row_projection_test VALUES "
+                                + "(1, ROW(10, 'str1'), ROW(100, ROW(1000, 
'nested1')), 1000), "
+                                + "(2, ROW(20, 'str2'), ROW(200, ROW(2000, 
'nested2')), 2000), "
+                                + "(3, ROW(30, 'str3'), ROW(300, ROW(3000, 
'nested3')), 3000)")
+                .await();
+
+        // Test projection: select id, simple_row, nested_row
+        String query = "select id, simple_row, nested_row from 
row_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
row_projection_test, project=[id, simple_row, nested_row]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, +I[10, str1], +I[100, +I[1000, nested1]]]",
+                        "+I[2, +I[20, str2], +I[200, +I[2000, nested2]]]",
+                        "+I[3, +I[30, str3], +I[300, +I[3000, nested3]]]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithNullComplexTypes() throws Exception {
+        // Test projection with null complex type values
+        tEnv.executeSql(
+                "create table null_complex_test ("
+                        + "id int, "
+                        + "arr array<int>, "
+                        + "mp map<string, int>, "
+                        + "rw row<a int, b string>, "
+                        + "name string"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO null_complex_test VALUES "
+                                + "(1, ARRAY[1, 2], MAP['k1', 1], ROW(10, 
'test'), 'name1'), "
+                                + "(2, CAST(NULL AS ARRAY<INT>), MAP['k2', 2], 
ROW(20, 'test2'), 'name2'), "
+                                + "(3, ARRAY[3], CAST(NULL AS MAP<STRING, 
INT>), ROW(30, 'test3'), 'name3'), "
+                                + "(4, ARRAY[4], MAP['k4', 4], CAST(NULL AS 
ROW<a INT, b STRING>), 'name4')")
+                .await();
+
+        // Test projection: select id, arr, mp, rw
+        String query = "select id, arr, mp, rw from null_complex_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
null_complex_test, project=[id, arr, mp, rw]]]");

Review Comment:
   The assertion for the execution plan should include the fields specification 
for consistency with other projection pushdown tests in this file. Other tests 
(e.g., testAppendTableProjectPushDown, testTableProjectPushDown) check for both 
project and fields in the TableSourceScan output.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -325,6 +325,224 @@ void testTableProjectPushDown(String mode) throws 
Exception {
         assertThat(actual).containsExactlyElementsOf(expected);
     }
 
+    @Test
+    void testProjectPushDownWithComplexTypes() throws Exception {
+        // Test projection pushdown for tables with Array, Map, and Row types
+        // This test verifies support for issue #1979
+        tEnv.executeSql(
+                "create table complex_type_projection_test ("
+                        + "a bigint, "
+                        + "b string, "
+                        + "c map<string, int>, "
+                        + "d row<d1 int, d2 string>, "
+                        + "e array<string>, "
+                        + "f int"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO complex_type_projection_test VALUES "
+                                + "(1, 'value1', MAP['key1', 10], ROW(100, 
'row1'), ARRAY['a', 'b'], 1000), "
+                                + "(2, 'value2', MAP['key2', 20], ROW(200, 
'row2'), ARRAY['c', 'd'], 2000), "
+                                + "(3, 'value3', MAP['key3', 30], ROW(300, 
'row3'), ARRAY['e', 'f'], 3000)")
+                .await();
+
+        // Test projection: select a, c, d, f (including complex types)
+        String query = "select a, c, d, f from complex_type_projection_test";
+
+        // Verify projection pushdown in the execution plan
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
complex_type_projection_test, project=[a, c, d, f]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, {key1=10}, +I[100, row1], 1000]",
+                        "+I[2, {key2=20}, +I[200, row2], 2000]",
+                        "+I[3, {key3=30}, +I[300, row3], 3000]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithArrayTypes() throws Exception {
+        // Test projection focusing on array types
+        tEnv.executeSql(
+                "create table array_projection_test ("
+                        + "id int, "
+                        + "int_array array<int>, "
+                        + "string_array array<string>, "
+                        + "name string"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO array_projection_test VALUES "
+                                + "(1, ARRAY[1, 2, 3], ARRAY['a', 'b', 'c'], 
'name1'), "
+                                + "(2, ARRAY[4, 5], ARRAY['d', 'e'], 'name2'), 
"
+                                + "(3, ARRAY[6, 7, 8, 9], ARRAY['f'], 
'name3')")
+                .await();
+
+        // Test projection: select id, int_array, string_array
+        String query = "select id, int_array, string_array from 
array_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
array_projection_test, project=[id, int_array, string_array]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, [1, 2, 3], [a, b, c]]",
+                        "+I[2, [4, 5], [d, e]]",
+                        "+I[3, [6, 7, 8, 9], [f]]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithMapTypes() throws Exception {
+        // Test projection focusing on map types
+        tEnv.executeSql(
+                "create table map_projection_test ("
+                        + "id int, "
+                        + "map1 map<string, int>, "
+                        + "map2 map<int, string>, "
+                        + "description string"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO map_projection_test VALUES "
+                                + "(1, MAP['k1', 10, 'k2', 20], MAP[1, 'v1', 
2, 'v2'], 'desc1'), "
+                                + "(2, MAP['k3', 30], MAP[3, 'v3'], 'desc2'), "
+                                + "(3, MAP['k4', 40, 'k5', 50, 'k6', 60], 
MAP[4, 'v4'], 'desc3')")
+                .await();
+
+        // Test projection: select id, map1, map2
+        String query = "select id, map1, map2 from map_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
map_projection_test, project=[id, map1, map2]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, {k1=10, k2=20}, {1=v1, 2=v2}]",
+                        "+I[2, {k3=30}, {3=v3}]",
+                        "+I[3, {k4=40, k5=50, k6=60}, {4=v4}]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithRowTypes() throws Exception {
+        // Test projection focusing on row types
+        tEnv.executeSql(
+                "create table row_projection_test ("
+                        + "id int, "
+                        + "simple_row row<x int, y string>, "
+                        + "nested_row row<a int, b row<c int, d string>>, "
+                        + "val bigint"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO row_projection_test VALUES "
+                                + "(1, ROW(10, 'str1'), ROW(100, ROW(1000, 
'nested1')), 1000), "
+                                + "(2, ROW(20, 'str2'), ROW(200, ROW(2000, 
'nested2')), 2000), "
+                                + "(3, ROW(30, 'str3'), ROW(300, ROW(3000, 
'nested3')), 3000)")
+                .await();
+
+        // Test projection: select id, simple_row, nested_row
+        String query = "select id, simple_row, nested_row from 
row_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
row_projection_test, project=[id, simple_row, nested_row]]]");

Review Comment:
   The assertion for the execution plan should include the fields specification 
for consistency with other projection pushdown tests in this file. Other tests 
(e.g., testAppendTableProjectPushDown, testTableProjectPushDown) check for both 
project and fields in the TableSourceScan output.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -325,6 +325,224 @@ void testTableProjectPushDown(String mode) throws 
Exception {
         assertThat(actual).containsExactlyElementsOf(expected);
     }
 
+    @Test
+    void testProjectPushDownWithComplexTypes() throws Exception {
+        // Test projection pushdown for tables with Array, Map, and Row types
+        // This test verifies support for issue #1979
+        tEnv.executeSql(
+                "create table complex_type_projection_test ("
+                        + "a bigint, "
+                        + "b string, "
+                        + "c map<string, int>, "
+                        + "d row<d1 int, d2 string>, "
+                        + "e array<string>, "
+                        + "f int"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO complex_type_projection_test VALUES "
+                                + "(1, 'value1', MAP['key1', 10], ROW(100, 
'row1'), ARRAY['a', 'b'], 1000), "
+                                + "(2, 'value2', MAP['key2', 20], ROW(200, 
'row2'), ARRAY['c', 'd'], 2000), "
+                                + "(3, 'value3', MAP['key3', 30], ROW(300, 
'row3'), ARRAY['e', 'f'], 3000)")
+                .await();
+
+        // Test projection: select a, c, d, f (including complex types)
+        String query = "select a, c, d, f from complex_type_projection_test";
+
+        // Verify projection pushdown in the execution plan
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
complex_type_projection_test, project=[a, c, d, f]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, {key1=10}, +I[100, row1], 1000]",
+                        "+I[2, {key2=20}, +I[200, row2], 2000]",
+                        "+I[3, {key3=30}, +I[300, row3], 3000]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithArrayTypes() throws Exception {
+        // Test projection focusing on array types
+        tEnv.executeSql(
+                "create table array_projection_test ("
+                        + "id int, "
+                        + "int_array array<int>, "
+                        + "string_array array<string>, "
+                        + "name string"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO array_projection_test VALUES "
+                                + "(1, ARRAY[1, 2, 3], ARRAY['a', 'b', 'c'], 
'name1'), "
+                                + "(2, ARRAY[4, 5], ARRAY['d', 'e'], 'name2'), 
"
+                                + "(3, ARRAY[6, 7, 8, 9], ARRAY['f'], 
'name3')")
+                .await();
+
+        // Test projection: select id, int_array, string_array
+        String query = "select id, int_array, string_array from 
array_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
array_projection_test, project=[id, int_array, string_array]]]");

Review Comment:
   The assertion for the execution plan should include the fields specification 
for consistency with other projection pushdown tests in this file. Other tests 
(e.g., testAppendTableProjectPushDown, testTableProjectPushDown) check for both 
project and fields in the TableSourceScan output.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -325,6 +325,224 @@ void testTableProjectPushDown(String mode) throws 
Exception {
         assertThat(actual).containsExactlyElementsOf(expected);
     }
 
+    @Test
+    void testProjectPushDownWithComplexTypes() throws Exception {
+        // Test projection pushdown for tables with Array, Map, and Row types
+        // This test verifies support for issue #1979
+        tEnv.executeSql(
+                "create table complex_type_projection_test ("
+                        + "a bigint, "
+                        + "b string, "
+                        + "c map<string, int>, "
+                        + "d row<d1 int, d2 string>, "
+                        + "e array<string>, "
+                        + "f int"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO complex_type_projection_test VALUES "
+                                + "(1, 'value1', MAP['key1', 10], ROW(100, 
'row1'), ARRAY['a', 'b'], 1000), "
+                                + "(2, 'value2', MAP['key2', 20], ROW(200, 
'row2'), ARRAY['c', 'd'], 2000), "
+                                + "(3, 'value3', MAP['key3', 30], ROW(300, 
'row3'), ARRAY['e', 'f'], 3000)")
+                .await();
+
+        // Test projection: select a, c, d, f (including complex types)
+        String query = "select a, c, d, f from complex_type_projection_test";
+
+        // Verify projection pushdown in the execution plan
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
complex_type_projection_test, project=[a, c, d, f]]]");

Review Comment:
   The assertion for the execution plan should include the fields specification 
for consistency with other projection pushdown tests in this file. Other tests 
(e.g., testAppendTableProjectPushDown, testTableProjectPushDown) check for both 
project and fields in the TableSourceScan output.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -325,6 +325,224 @@ void testTableProjectPushDown(String mode) throws 
Exception {
         assertThat(actual).containsExactlyElementsOf(expected);
     }
 
+    @Test
+    void testProjectPushDownWithComplexTypes() throws Exception {
+        // Test projection pushdown for tables with Array, Map, and Row types
+        // This test verifies support for issue #1979
+        tEnv.executeSql(
+                "create table complex_type_projection_test ("
+                        + "a bigint, "
+                        + "b string, "
+                        + "c map<string, int>, "
+                        + "d row<d1 int, d2 string>, "
+                        + "e array<string>, "
+                        + "f int"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO complex_type_projection_test VALUES "
+                                + "(1, 'value1', MAP['key1', 10], ROW(100, 
'row1'), ARRAY['a', 'b'], 1000), "
+                                + "(2, 'value2', MAP['key2', 20], ROW(200, 
'row2'), ARRAY['c', 'd'], 2000), "
+                                + "(3, 'value3', MAP['key3', 30], ROW(300, 
'row3'), ARRAY['e', 'f'], 3000)")
+                .await();
+
+        // Test projection: select a, c, d, f (including complex types)
+        String query = "select a, c, d, f from complex_type_projection_test";
+
+        // Verify projection pushdown in the execution plan
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
complex_type_projection_test, project=[a, c, d, f]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, {key1=10}, +I[100, row1], 1000]",
+                        "+I[2, {key2=20}, +I[200, row2], 2000]",
+                        "+I[3, {key3=30}, +I[300, row3], 3000]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithArrayTypes() throws Exception {
+        // Test projection focusing on array types
+        tEnv.executeSql(
+                "create table array_projection_test ("
+                        + "id int, "
+                        + "int_array array<int>, "
+                        + "string_array array<string>, "
+                        + "name string"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO array_projection_test VALUES "
+                                + "(1, ARRAY[1, 2, 3], ARRAY['a', 'b', 'c'], 
'name1'), "
+                                + "(2, ARRAY[4, 5], ARRAY['d', 'e'], 'name2'), 
"
+                                + "(3, ARRAY[6, 7, 8, 9], ARRAY['f'], 
'name3')")
+                .await();
+
+        // Test projection: select id, int_array, string_array
+        String query = "select id, int_array, string_array from 
array_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
array_projection_test, project=[id, int_array, string_array]]]");
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, [1, 2, 3], [a, b, c]]",
+                        "+I[2, [4, 5], [d, e]]",
+                        "+I[3, [6, 7, 8, 9], [f]]");
+        assertResultsIgnoreOrder(rowIter, expected, true);
+    }
+
+    @Test
+    void testProjectPushDownWithMapTypes() throws Exception {
+        // Test projection focusing on map types
+        tEnv.executeSql(
+                "create table map_projection_test ("
+                        + "id int, "
+                        + "map1 map<string, int>, "
+                        + "map2 map<int, string>, "
+                        + "description string"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO map_projection_test VALUES "
+                                + "(1, MAP['k1', 10, 'k2', 20], MAP[1, 'v1', 
2, 'v2'], 'desc1'), "
+                                + "(2, MAP['k3', 30], MAP[3, 'v3'], 'desc2'), "
+                                + "(3, MAP['k4', 40, 'k5', 50, 'k6', 60], 
MAP[4, 'v4'], 'desc3')")
+                .await();
+
+        // Test projection: select id, map1, map2
+        String query = "select id, map1, map2 from map_projection_test";
+
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
map_projection_test, project=[id, map1, map2]]]");

Review Comment:
   The assertion for the execution plan should include the fields specification 
for consistency with other projection pushdown tests in this file. Other tests 
(e.g., testAppendTableProjectPushDown, testTableProjectPushDown) check for both 
project and fields in the TableSourceScan output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to