Sebastien Pereira created FLINK-38950:
-----------------------------------------

             Summary: Views with window TVFs and ORDER BY generate malformed 
SQL with duplicated ORDER BY clauses
                 Key: FLINK-38950
                 URL: https://issues.apache.org/jira/browse/FLINK-38950
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 2.2.0
         Environment: N/A
            Reporter: Sebastien Pereira


Creating a view that contains a window table function (TUMBLE, HOP, SESSION, or 
CUMULATE) combined with an *ORDER BY* clause results in malformed SQL being 
stored in the catalog. The expanded SQL contains duplicated ORDER BY clauses, 
causing *SqlParserException* when the view is queried.

 
{code:sql}
CREATE TABLE orders (
  orderId   INT,
  price     DECIMAL(10, 2),
  quantity  INT,
  ts        TIMESTAMP_LTZ(3),
  WATERMARK FOR ts AS ts
) WITH (
  'connector' = 'datagen',
  'number-of-rows' = '10'
);

-- Create a view with window TVF and ORDER BY
CREATE VIEW tumble_view AS 
SELECT * 
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
ORDER BY ts;

-- Query the view (this fails)
SELECT * FROM tumble_view;

-- Explain also fails with the same error
EXPLAIN SELECT * FROM tumble_view;
{code}
The view creation succeeds, but operation that references the view fails with:
{code:java}
org.apache.calcite.sql.parser.SqlParseException: Encountered "ORDER" at line 5, 
column 1.
{code}
h3. Root cause analyses

In 
[*SqlNodeConvertUtils.toCatalogView()*|https://github.com/apache/flink/blob/release-2.2.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java#L88]
 When generating the expanded SQL for the view, *context.toQuotedSqlString()* 
uses the input *query* node after it has been mutated by {*}validate(){*}, 
instead of using the validated output *validateQuery* node: 

Calcite's *validate()* method mutates the input SqlNode, causing the *ORDER BY* 
clause to be duplicated. The expanded SQL stored in the catalog becomes:
{code:sql}
SELECT * FROM TABLE(TUMBLE(...)) ORDER BY ts ORDER BY ts
{code}
When the view is created:
 # *SqlNodeConvertUtils.toCatalogView()* is called
 # *validate(query)* mutates the input query node
 # *toQuotedSqlString(query)* uses the mutated node
 # Generated SQL has {*}duplicated ORDER BY{*}: ** ... ORDER BY ts ORDER BY ts
 # Malformed SQL is stored in catalog as the view's expanded query

On query or explain:
 # the view needs to be expanded to be integrated it into the query plan: the 
stored
 # expanded SQL is retrieved from the catalog
 # *CalciteParser.parse()* attempts to parse the expanded SQL
 # Parser encounters the *duplicated ORDER BY* and fails
 # _Error: SqlParseException: Encountered "ORDER" at line 5, column 1_

 
{code:java}
        at 
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:61)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:367)
        at 
org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
        at 
org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
        at 
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
        at 
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4142)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2997)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2529)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2435)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2380)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:758)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:746)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3967)
        at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:650)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:235)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:210)
        at 
org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:82)
        at 
org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
        at 
org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:133)
        at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:231)
        at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:221)
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:917)
 

[...OR...]

at 
org.apache.flink.table.api.TableEnvironment.explainSql(TableEnvironment.java:1480){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to