This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c7beda0da81 [FLINK-33327] Window TVF column expansion does not work 
with an INSERT INTO
c7beda0da81 is described below

commit c7beda0da81ffc4bbb01befafd2eed08b7b35854
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Oct 20 15:09:21 2023 +0200

    [FLINK-33327] Window TVF column expansion does not work with an INSERT INTO
---
 .../planner/calcite/FlinkCalciteSqlValidator.java  |  8 ++++++-
 .../plan/stream/sql/ColumnExpansionTest.java       | 25 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index 0b0075a4f64..a8cd9265e69 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -54,6 +54,7 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.SqlSnapshot;
+import org.apache.calcite.sql.SqlTableFunction;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.SqlWindowTableFunction;
 import org.apache.calcite.sql.parser.SqlParserPos;
@@ -441,7 +442,7 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
         }
         final SqlFunction function = (SqlFunction) call.getOperator();
 
-        if (function.getFunctionType() != 
SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION) {
+        if (!isTableFunction(function)) {
             return null;
         }
 
@@ -459,4 +460,9 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
                         })
                 .collect(Collectors.toList());
     }
+
+    private static boolean isTableFunction(SqlFunction function) {
+        return function instanceof SqlTableFunction
+                || function.getFunctionType() == 
SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
index 290b981583e..331cc13f36c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
@@ -260,6 +260,31 @@ public class ColumnExpansionTest {
                 "agg");
     }
 
+    @Test
+    public void 
testExplicitTableWithinTableFunctionWithInsertIntoNamedColumns() {
+        tableEnv.getConfig()
+                .set(
+                        TABLE_COLUMN_EXPANSION_STRATEGY,
+                        
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+
+        tableEnv.executeSql(
+                "CREATE TABLE sink (\n"
+                        + "  a STRING,\n"
+                        + "  c BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',"
+                        + " 'sink-insert-only' = 'false'"
+                        + ")");
+
+        // Test case for FLINK-33327, we can not assert column names of an 
INSERT INTO query. Make
+        // sure the query can be planned.
+        tableEnv.explainSql(
+                "INSERT INTO sink(a, c) "
+                        + "SELECT t3_s, COUNT(t3_i) FROM "
+                        + " TABLE(TUMBLE(TABLE t3, DESCRIPTOR(t3_m_virtual), 
INTERVAL '1' MINUTE)) "
+                        + "GROUP BY t3_s;");
+    }
+
     private void assertColumnNames(String sql, String... columnNames) {
         assertThat(tableEnv.sqlQuery(sql).getResolvedSchema().getColumnNames())
                 .containsExactly(columnNames);

Reply via email to