[ 
https://issues.apache.org/jira/browse/FLINK-38950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebastien Pereira updated FLINK-38950:
--------------------------------------
    Description: 
Creating a view that contains a window table function (TUMBLE, HOP, SESSION, or 
CUMULATE) combined with an *ORDER BY* clause results in malformed SQL being 
stored in the catalog. The expanded SQL contains duplicated ORDER BY clauses, 
causing *SqlParserException* when the view is queried.

 
{code:sql}
CREATE TABLE orders (
  orderId   INT,
  price     DECIMAL(10, 2),
  quantity  INT,
  ts        TIMESTAMP_LTZ(3),
  WATERMARK FOR ts AS ts
) WITH (
  'connector' = 'datagen',
  'number-of-rows' = '10'
);

-- Create a view with window TVF and ORDER BY
CREATE VIEW tumble_view AS 
SELECT * 
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
ORDER BY ts;

-- Query the view (this fails)
SELECT * FROM tumble_view;

-- Explain also fails with the same error
EXPLAIN SELECT * FROM tumble_view;
{code}
The view creation succeeds, but operation that references the view fails with:
{code:java}
org.apache.calcite.sql.parser.SqlParseException: Encountered "ORDER" at line 5, 
column 1.
{code}
h3. Root cause analyses

In 
[*SqlNodeConvertUtils.toCatalogView()*|https://github.com/apache/flink/blob/release-2.2.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java#L88]
 When generating the expanded SQL for the view, *context.toQuotedSqlString()* 
uses the input *query* node after it has been mutated by {*}validate(){*}, 
instead of using the validated output *validateQuery* node: 

Calcite's *validate()* method mutates the input SqlNode, causing the *ORDER BY* 
clause to be duplicated. The expanded SQL stored in the catalog becomes:
{code:sql}
SELECT * FROM TABLE(TUMBLE(...)) ORDER BY ts ORDER BY ts
{code}
When the view is created:
 # *SqlNodeConvertUtils.toCatalogView()* is called
 # *validate(query)* mutates the input query node
 # *toQuotedSqlString(query)* uses the mutated node
 # Generated SQL has {*}duplicated ORDER BY{*}: ** ... ORDER BY ts ORDER BY ts
 # Malformed SQL is stored in catalog as the view's expanded query

On query or explain:
 # the view needs to be expanded to be integrated it into the query plan: the 
stored expanded SQL is retrieved from the catalog
 # *CalciteParser.parse()* attempts to parse the expanded SQL
 # Parser encounters the *duplicated ORDER BY* and fails
 # _Error: SqlParseException: Encountered "ORDER" at line 5, column 1_

 
{code:java}
        at 
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:61)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:367)
        at 
org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
        at 
org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
        at 
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
        at 
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4142)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2997)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2529)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2435)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2380)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:758)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:746)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3967)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:650)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:235)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:210)
        at 
org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:82)
        at 
org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
        at 
org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:133)
        at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:231)
        at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:221)
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:917)
 

[...OR...]

at 
org.apache.flink.table.api.TableEnvironment.explainSql(TableEnvironment.java:1480){code}

  was:
Creating a view that contains a window table function (TUMBLE, HOP, SESSION, or 
CUMULATE) combined with an *ORDER BY* clause results in malformed SQL being 
stored in the catalog. The expanded SQL contains duplicated ORDER BY clauses, 
causing *SqlParserException* when the view is queried.

 
{code:sql}
CREATE TABLE orders (
  orderId   INT,
  price     DECIMAL(10, 2),
  quantity  INT,
  ts        TIMESTAMP_LTZ(3),
  WATERMARK FOR ts AS ts
) WITH (
  'connector' = 'datagen',
  'number-of-rows' = '10'
);

-- Create a view with window TVF and ORDER BY
CREATE VIEW tumble_view AS 
SELECT * 
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
ORDER BY ts;

-- Query the view (this fails)
SELECT * FROM tumble_view;

-- Explain also fails with the same error
EXPLAIN SELECT * FROM tumble_view;
{code}
The view creation succeeds, but operation that references the view fails with:
{code:java}
org.apache.calcite.sql.parser.SqlParseException: Encountered "ORDER" at line 5, 
column 1.
{code}
h3. Root cause analyses

In 
[*SqlNodeConvertUtils.toCatalogView()*|https://github.com/apache/flink/blob/release-2.2.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java#L88]
 When generating the expanded SQL for the view, *context.toQuotedSqlString()* 
uses the input *query* node after it has been mutated by {*}validate(){*}, 
instead of using the validated output *validateQuery* node: 

Calcite's *validate()* method mutates the input SqlNode, causing the *ORDER BY* 
clause to be duplicated. The expanded SQL stored in the catalog becomes:
{code:sql}
SELECT * FROM TABLE(TUMBLE(...)) ORDER BY ts ORDER BY ts
{code}
When the view is created:
 # *SqlNodeConvertUtils.toCatalogView()* is called
 # *validate(query)* mutates the input query node
 # *toQuotedSqlString(query)* uses the mutated node
 # Generated SQL has {*}duplicated ORDER BY{*}: ** ... ORDER BY ts ORDER BY ts
 # Malformed SQL is stored in catalog as the view's expanded query

On query or explain:
 # the view needs to be expanded to be integrated it into the query plan: the 
stored
 # expanded SQL is retrieved from the catalog
 # *CalciteParser.parse()* attempts to parse the expanded SQL
 # Parser encounters the *duplicated ORDER BY* and fails
 # _Error: SqlParseException: Encountered "ORDER" at line 5, column 1_

 
{code:java}
        at 
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:61)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:367)
        at 
org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
        at 
org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
        at 
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
        at 
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4142)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2997)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2529)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2435)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2380)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:758)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:746)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3967)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:650)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:235)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:210)
        at 
org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:82)
        at 
org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
        at 
org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:133)
        at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:231)
        at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:221)
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:917)
 

[...OR...]

at 
org.apache.flink.table.api.TableEnvironment.explainSql(TableEnvironment.java:1480){code}


> Views with window TVFs and ORDER BY generate malformed SQL with duplicated 
> ORDER BY clauses
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38950
>                 URL: https://issues.apache.org/jira/browse/FLINK-38950
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.2.0
>         Environment: N/A
>            Reporter: Sebastien Pereira
>            Priority: Major
>              Labels: pull-request-available
>
> Creating a view that contains a window table function (TUMBLE, HOP, SESSION, 
> or CUMULATE) combined with an *ORDER BY* clause results in malformed SQL 
> being stored in the catalog. The expanded SQL contains duplicated ORDER BY 
> clauses, causing *SqlParserException* when the view is queried.
>  
> {code:sql}
> CREATE TABLE orders (
>   orderId   INT,
>   price     DECIMAL(10, 2),
>   quantity  INT,
>   ts        TIMESTAMP_LTZ(3),
>   WATERMARK FOR ts AS ts
> ) WITH (
>   'connector' = 'datagen',
>   'number-of-rows' = '10'
> );
> -- Create a view with window TVF and ORDER BY
> CREATE VIEW tumble_view AS 
> SELECT * 
> FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
> ORDER BY ts;
> -- Query the view (this fails)
> SELECT * FROM tumble_view;
> -- Explain also fails with the same error
> EXPLAIN SELECT * FROM tumble_view;
> {code}
> The view creation succeeds, but operation that references the view fails with:
> {code:java}
> org.apache.calcite.sql.parser.SqlParseException: Encountered "ORDER" at line 
> 5, column 1.
> {code}
> h3. Root cause analyses
> In 
> [*SqlNodeConvertUtils.toCatalogView()*|https://github.com/apache/flink/blob/release-2.2.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java#L88]
>  When generating the expanded SQL for the view, *context.toQuotedSqlString()* 
> uses the input *query* node after it has been mutated by {*}validate(){*}, 
> instead of using the validated output *validateQuery* node: 
> Calcite's *validate()* method mutates the input SqlNode, causing the *ORDER 
> BY* clause to be duplicated. The expanded SQL stored in the catalog becomes:
> {code:sql}
> SELECT * FROM TABLE(TUMBLE(...)) ORDER BY ts ORDER BY ts
> {code}
> When the view is created:
>  # *SqlNodeConvertUtils.toCatalogView()* is called
>  # *validate(query)* mutates the input query node
>  # *toQuotedSqlString(query)* uses the mutated node
>  # Generated SQL has {*}duplicated ORDER BY{*}: ** ... ORDER BY ts ORDER BY ts
>  # Malformed SQL is stored in catalog as the view's expanded query
> On query or explain:
>  # the view needs to be expanded to be integrated it into the query plan: the 
> stored expanded SQL is retrieved from the catalog
>  # *CalciteParser.parse()* attempts to parse the expanded SQL
>  # Parser encounters the *duplicated ORDER BY* and fails
>  # _Error: SqlParseException: Encountered "ORDER" at line 5, column 1_
>  
> {code:java}
>         at 
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:61)
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:367)
>         at 
> org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
>         at 
> org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
>         at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
>         at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
>         at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4142)
>         at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2997)
>         at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2529)
>         at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2435)
>         at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2380)
>         at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:758)
>         at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:746)
>         at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3967)
>         at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:650)
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:235)
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:210)
>         at 
> org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:82)
>         at 
> org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
>         at 
> org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:133)
>         at 
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:231)
>         at 
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:221)
>         at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>         at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:917)
>  
> [...OR...]
> at 
> org.apache.flink.table.api.TableEnvironment.explainSql(TableEnvironment.java:1480){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to