[
https://issues.apache.org/jira/browse/FLINK-38950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sebastien Pereira updated FLINK-38950:
--------------------------------------
Description:
Creating a view that contains a window table function (TUMBLE, HOP, SESSION, or
CUMULATE) combined with an *ORDER BY* clause results in malformed SQL being
stored in the catalog. The expanded SQL contains duplicated ORDER BY clauses,
causing *SqlParserException* when the view is queried.
{code:sql}
CREATE TABLE orders (
orderId INT,
price DECIMAL(10, 2),
quantity INT,
ts TIMESTAMP_LTZ(3),
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
);
-- Create a view with window TVF and ORDER BY
CREATE VIEW tumble_view AS
SELECT *
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
ORDER BY ts;
-- Query the view (this fails)
SELECT * FROM tumble_view;
-- Explain also fails with the same error
EXPLAIN SELECT * FROM tumble_view;
{code}
The view creation succeeds, but operation that references the view fails with:
{code:java}
org.apache.calcite.sql.parser.SqlParseException: Encountered "ORDER" at line 5,
column 1.
{code}
h3. Root cause analyses
In
[*SqlNodeConvertUtils.toCatalogView()*|https://github.com/apache/flink/blob/release-2.2.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java#L88]
When generating the expanded SQL for the view, *context.toQuotedSqlString()*
uses the input *query* node after it has been mutated by {*}validate(){*},
instead of using the validated output *validateQuery* node:
Calcite's *validate()* method mutates the input SqlNode, causing the *ORDER BY*
clause to be duplicated. The expanded SQL stored in the catalog becomes:
{code:sql}
SELECT * FROM TABLE(TUMBLE(...)) ORDER BY ts ORDER BY ts
{code}
When the view is created:
# *SqlNodeConvertUtils.toCatalogView()* is called
# *validate(query)* mutates the input query node
# *toQuotedSqlString(query)* uses the mutated node
# Generated SQL has {*}duplicated ORDER BY{*}: ** ... ORDER BY ts ORDER BY ts
# Malformed SQL is stored in catalog as the view's expanded query
On query or explain:
# the view needs to be expanded to be integrated it into the query plan: the
stored expanded SQL is retrieved from the catalog
# *CalciteParser.parse()* attempts to parse the expanded SQL
# Parser encounters the *duplicated ORDER BY* and fails
# _Error: SqlParseException: Encountered "ORDER" at line 5, column 1_
{code:java}
at
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:61)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:367)
at
org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
at
org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
at
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
at
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4142)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2997)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2529)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2435)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2380)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:758)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:746)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3967)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:650)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:235)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:210)
at
org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:82)
at
org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
at
org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:133)
at
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:231)
at
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:221)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:917)
[...OR...]
at
org.apache.flink.table.api.TableEnvironment.explainSql(TableEnvironment.java:1480){code}
was:
Creating a view that contains a window table function (TUMBLE, HOP, SESSION, or
CUMULATE) combined with an *ORDER BY* clause results in malformed SQL being
stored in the catalog. The expanded SQL contains duplicated ORDER BY clauses,
causing *SqlParserException* when the view is queried.
{code:sql}
CREATE TABLE orders (
orderId INT,
price DECIMAL(10, 2),
quantity INT,
ts TIMESTAMP_LTZ(3),
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
);
-- Create a view with window TVF and ORDER BY
CREATE VIEW tumble_view AS
SELECT *
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
ORDER BY ts;
-- Query the view (this fails)
SELECT * FROM tumble_view;
-- Explain also fails with the same error
EXPLAIN SELECT * FROM tumble_view;
{code}
The view creation succeeds, but operation that references the view fails with:
{code:java}
org.apache.calcite.sql.parser.SqlParseException: Encountered "ORDER" at line 5,
column 1.
{code}
h3. Root cause analyses
In
[*SqlNodeConvertUtils.toCatalogView()*|https://github.com/apache/flink/blob/release-2.2.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java#L88]
When generating the expanded SQL for the view, *context.toQuotedSqlString()*
uses the input *query* node after it has been mutated by {*}validate(){*},
instead of using the validated output *validateQuery* node:
Calcite's *validate()* method mutates the input SqlNode, causing the *ORDER BY*
clause to be duplicated. The expanded SQL stored in the catalog becomes:
{code:sql}
SELECT * FROM TABLE(TUMBLE(...)) ORDER BY ts ORDER BY ts
{code}
When the view is created:
# *SqlNodeConvertUtils.toCatalogView()* is called
# *validate(query)* mutates the input query node
# *toQuotedSqlString(query)* uses the mutated node
# Generated SQL has {*}duplicated ORDER BY{*}: ** ... ORDER BY ts ORDER BY ts
# Malformed SQL is stored in catalog as the view's expanded query
On query or explain:
# the view needs to be expanded to be integrated it into the query plan: the
stored
# expanded SQL is retrieved from the catalog
# *CalciteParser.parse()* attempts to parse the expanded SQL
# Parser encounters the *duplicated ORDER BY* and fails
# _Error: SqlParseException: Encountered "ORDER" at line 5, column 1_
{code:java}
at
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:61)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:367)
at
org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
at
org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
at
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
at
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4142)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2997)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2529)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2435)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2380)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:758)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:746)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3967)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:650)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:235)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:210)
at
org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:82)
at
org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
at
org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:133)
at
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:231)
at
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:221)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:917)
[...OR...]
at
org.apache.flink.table.api.TableEnvironment.explainSql(TableEnvironment.java:1480){code}
> Views with window TVFs and ORDER BY generate malformed SQL with duplicated
> ORDER BY clauses
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-38950
> URL: https://issues.apache.org/jira/browse/FLINK-38950
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 2.2.0
> Environment: N/A
> Reporter: Sebastien Pereira
> Priority: Major
> Labels: pull-request-available
>
> Creating a view that contains a window table function (TUMBLE, HOP, SESSION,
> or CUMULATE) combined with an *ORDER BY* clause results in malformed SQL
> being stored in the catalog. The expanded SQL contains duplicated ORDER BY
> clauses, causing *SqlParserException* when the view is queried.
>
> {code:sql}
> CREATE TABLE orders (
> orderId INT,
> price DECIMAL(10, 2),
> quantity INT,
> ts TIMESTAMP_LTZ(3),
> WATERMARK FOR ts AS ts
> ) WITH (
> 'connector' = 'datagen',
> 'number-of-rows' = '10'
> );
> -- Create a view with window TVF and ORDER BY
> CREATE VIEW tumble_view AS
> SELECT *
> FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
> ORDER BY ts;
> -- Query the view (this fails)
> SELECT * FROM tumble_view;
> -- Explain also fails with the same error
> EXPLAIN SELECT * FROM tumble_view;
> {code}
> The view creation succeeds, but operation that references the view fails with:
> {code:java}
> org.apache.calcite.sql.parser.SqlParseException: Encountered "ORDER" at line
> 5, column 1.
> {code}
> h3. Root cause analyses
> In
> [*SqlNodeConvertUtils.toCatalogView()*|https://github.com/apache/flink/blob/release-2.2.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java#L88]
> When generating the expanded SQL for the view, *context.toQuotedSqlString()*
> uses the input *query* node after it has been mutated by {*}validate(){*},
> instead of using the validated output *validateQuery* node:
> Calcite's *validate()* method mutates the input SqlNode, causing the *ORDER
> BY* clause to be duplicated. The expanded SQL stored in the catalog becomes:
> {code:sql}
> SELECT * FROM TABLE(TUMBLE(...)) ORDER BY ts ORDER BY ts
> {code}
> When the view is created:
> # *SqlNodeConvertUtils.toCatalogView()* is called
> # *validate(query)* mutates the input query node
> # *toQuotedSqlString(query)* uses the mutated node
> # Generated SQL has {*}duplicated ORDER BY{*}: ** ... ORDER BY ts ORDER BY ts
> # Malformed SQL is stored in catalog as the view's expanded query
> On query or explain:
> # the view needs to be expanded to be integrated it into the query plan: the
> stored expanded SQL is retrieved from the catalog
> # *CalciteParser.parse()* attempts to parse the expanded SQL
> # Parser encounters the *duplicated ORDER BY* and fails
> # _Error: SqlParseException: Encountered "ORDER" at line 5, column 1_
>
> {code:java}
> at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:61)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:367)
> at
> org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
> at
> org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
> at
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
> at
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4142)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2997)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2529)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2435)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2380)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:758)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:746)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3967)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:650)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:235)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:210)
> at
> org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:82)
> at
> org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
> at
> org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:133)
> at
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:231)
> at
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:221)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:917)
>
> [...OR...]
> at
> org.apache.flink.table.api.TableEnvironment.explainSql(TableEnvironment.java:1480){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)