This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c7beda0da81 [FLINK-33327] Window TVF column expansion does not work
with an INSERT INTO
c7beda0da81 is described below
commit c7beda0da81ffc4bbb01befafd2eed08b7b35854
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Oct 20 15:09:21 2023 +0200
[FLINK-33327] Window TVF column expansion does not work with an INSERT INTO
---
.../planner/calcite/FlinkCalciteSqlValidator.java | 8 ++++++-
.../plan/stream/sql/ColumnExpansionTest.java | 25 ++++++++++++++++++++++
2 files changed, 32 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index 0b0075a4f64..a8cd9265e69 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -54,6 +54,7 @@ import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlSnapshot;
+import org.apache.calcite.sql.SqlTableFunction;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindowTableFunction;
import org.apache.calcite.sql.parser.SqlParserPos;
@@ -441,7 +442,7 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
}
final SqlFunction function = (SqlFunction) call.getOperator();
- if (function.getFunctionType() !=
SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION) {
+ if (!isTableFunction(function)) {
return null;
}
@@ -459,4 +460,9 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
})
.collect(Collectors.toList());
}
+
+ private static boolean isTableFunction(SqlFunction function) {
+ return function instanceof SqlTableFunction
+ || function.getFunctionType() ==
SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION;
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
index 290b981583e..331cc13f36c 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
@@ -260,6 +260,31 @@ public class ColumnExpansionTest {
"agg");
}
+ @Test
+ public void
testExplicitTableWithinTableFunctionWithInsertIntoNamedColumns() {
+ tableEnv.getConfig()
+ .set(
+ TABLE_COLUMN_EXPANSION_STRATEGY,
+
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+
+ tableEnv.executeSql(
+ "CREATE TABLE sink (\n"
+ + " a STRING,\n"
+ + " c BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ")");
+
+ // Test case for FLINK-33327, we can not assert column names of an
INSERT INTO query. Make
+ // sure the query can be planned.
+ tableEnv.explainSql(
+ "INSERT INTO sink(a, c) "
+ + "SELECT t3_s, COUNT(t3_i) FROM "
+ + " TABLE(TUMBLE(TABLE t3, DESCRIPTOR(t3_m_virtual),
INTERVAL '1' MINUTE)) "
+ + "GROUP BY t3_s;");
+ }
+
private void assertColumnNames(String sql, String... columnNames) {
assertThat(tableEnv.sqlQuery(sql).getResolvedSchema().getColumnNames())
.containsExactly(columnNames);