[
https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126322#comment-17126322
]
Benchao Li commented on FLINK-18070:
------------------------------------
As you can see, we only convert the time attributes from the input of sink.
Actually, the sink node can have it's time attributes too, that's why this line
exists:
```java
// the LogicalSink is converted in RelTimeIndicatorConverter before
if (rootRel.isInstanceOf[LogicalLegacySink] ||
!needFinalTimeIndicatorConversion) {
return convertedRoot
}
```
I think the right way to fix this is to leverage the
`needFinalTimeIndicatorConversion` variable in
`RelTimeIndicatorConverter.convert`, it's always true for now. We need to
differentiate the case whether current optimization block is a final block or
an intermediate logical block.
> Time attribute been materialized after sub graph optimize
> ---------------------------------------------------------
>
> Key: FLINK-18070
> URL: https://issues.apache.org/jira/browse/FLINK-18070
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: YufeiLiu
> Assignee: YufeiLiu
> Priority: Major
> Fix For: 1.11.0
>
>
> Hi, I want to use window aggregate after create temporary, and has multiple
> sinks. But throw exception:
> {code:java}
> java.lang.AssertionError: type mismatch:
> ref:
> TIME ATTRIBUTE(PROCTIME) NOT NULL
> input:
> TIMESTAMP(3) NOT NULL
> {code}
> I look into the optimizer logic, there is comment at
> {{CommonSubGraphBasedOptimizer}}:
> "1. In general, for multi-sinks users tend to use VIEW which is a natural
> common sub-graph."
> After sub graph optimize, time attribute from source have been convert to
> basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my
> create view sql is simple query, I think didn't need to materialized time
> attribute in theory.
> Here is my code:
> {code:java}
> // connector.type COLLECTION is for debug use
> tableEnv.sqlUpdate("CREATE TABLE source (\n" +
> " `ts` AS PROCTIME(),\n" +
> " `order_type` INT\n" +
> ") WITH (\n" +
> " 'connector.type' = 'COLLECTION',\n" +
> " 'format.type' = 'json'\n" +
> ")\n");
> tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM
> source"));
> tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
> " `result` BIGINT\n" +
> ") WITH (\n" +
> " 'connector.type' = 'COLLECTION',\n" +
> " 'format.type' = 'json'\n" +
> ")\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
> "SELECT\n" +
> " COUNT(1)\n" +
> "FROM\n" +
> " `source_view`\n" +
> "WHERE\n" +
> " `order_type` = 33\n" +
> "GROUP BY\n" +
> " TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> tableEnv.sqlUpdate("INSERT INTO sink \n" +
> "SELECT\n" +
> " COUNT(1)\n" +
> "FROM\n" +
> " `source_view`\n" +
> "WHERE\n" +
> " `order_type` = 34\n" +
> "GROUP BY\n" +
> " TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)