Sebastien Pereira created FLINK-38950:
-----------------------------------------
Summary: Views with window TVFs and ORDER BY generate malformed
SQL with duplicated ORDER BY clauses
Key: FLINK-38950
URL: https://issues.apache.org/jira/browse/FLINK-38950
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 2.2.0
Environment: N/A
Reporter: Sebastien Pereira
Creating a view that contains a window table function (TUMBLE, HOP, SESSION, or
CUMULATE) combined with an *ORDER BY* clause results in malformed SQL being
stored in the catalog. The expanded SQL contains duplicated ORDER BY clauses,
causing *SqlParserException* when the view is queried.
{code:sql}
CREATE TABLE orders (
orderId INT,
price DECIMAL(10, 2),
quantity INT,
ts TIMESTAMP_LTZ(3),
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
);
-- Create a view with window TVF and ORDER BY
CREATE VIEW tumble_view AS
SELECT *
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
ORDER BY ts;
-- Query the view (this fails)
SELECT * FROM tumble_view;
-- Explain also fails with the same error
EXPLAIN SELECT * FROM tumble_view;
{code}
The view creation succeeds, but operation that references the view fails with:
{code:java}
org.apache.calcite.sql.parser.SqlParseException: Encountered "ORDER" at line 5,
column 1.
{code}
h3. Root cause analyses
In
[*SqlNodeConvertUtils.toCatalogView()*|https://github.com/apache/flink/blob/release-2.2.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java#L88]
When generating the expanded SQL for the view, *context.toQuotedSqlString()*
uses the input *query* node after it has been mutated by {*}validate(){*},
instead of using the validated output *validateQuery* node:
Calcite's *validate()* method mutates the input SqlNode, causing the *ORDER BY*
clause to be duplicated. The expanded SQL stored in the catalog becomes:
{code:sql}
SELECT * FROM TABLE(TUMBLE(...)) ORDER BY ts ORDER BY ts
{code}
When the view is created:
# *SqlNodeConvertUtils.toCatalogView()* is called
# *validate(query)* mutates the input query node
# *toQuotedSqlString(query)* uses the mutated node
# Generated SQL has {*}duplicated ORDER BY{*}: ** ... ORDER BY ts ORDER BY ts
# Malformed SQL is stored in catalog as the view's expanded query
On query or explain:
# the view needs to be expanded to be integrated it into the query plan: the
stored
# expanded SQL is retrieved from the catalog
# *CalciteParser.parse()* attempts to parse the expanded SQL
# Parser encounters the *duplicated ORDER BY* and fails
# _Error: SqlParseException: Encountered "ORDER" at line 5, column 1_
{code:java}
at
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:61)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:367)
at
org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
at
org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
at
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
at
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4142)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2997)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2529)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2435)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2380)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:758)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:746)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3967)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:650)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:235)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:210)
at
org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:82)
at
org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
at
org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:133)
at
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:231)
at
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:221)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:917)
[...OR...]
at
org.apache.flink.table.api.TableEnvironment.explainSql(TableEnvironment.java:1480){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)