[
https://issues.apache.org/jira/browse/FLINK-18548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leonard Xu updated FLINK-18548:
-------------------------------
Description:
In Flink 1.10, we bring computed column feature, but I found dimension table do
not support this feature.
public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
" currency_id BIGINT,\n" +
" currency_name STRING,\n" +
" rate DECIMAL(38, 4),\n" +
" currency_time TIMESTAMP(3),\n" +
" country STRING,\n" +
" timestamp6 TIMESTAMP(6),\n" +
" currency_next as currency_id + 1,\n" +
" time6 TIME(6),\n" +
" gdp DECIMAL(10, 4)\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
" 'connector.username' = 'root'," +
" 'connector.table' = 'currency',\n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
" 'connector.lookup.cache.max-rows' = '500', \n" +
" 'connector.lookup.cache.ttl' = '10s',\n" +
" 'connector.lookup.max-retries' = '3'" +
")";
//
//
Exception in thread "main" java.lang.ClassCastException:
org.apache.calcite.rel.logical.LogicalProject cannot be cast to
org.apache.calcite.rel.core.TableScanException in thread "main"
java.lang.ClassCastException: org.apache.calcite.rel.logical.LogicalProject
cannot be cast to org.apache.calcite.rel.core.TableScan at
org.apache.calcite.sql2rel.SqlToRelConverter.snapshotTemporalTable(SqlToRelConverter.java:2438)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2062)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2085)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:148)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:523)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:437)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:343)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at job.KafkaJoinJdbc2Jdbc.main(KafkaJoinJdbc2Jdbc.java:59)
Process finished with exit code 1
was:
FlinkCalcMergeRule should merge calc nodes when outer calc do not contains
inner calc's fields, the following logical plan can not merge as expected
currently:
{code:java}
FlinkLogicalJoin(condition=[AND(=($0, $4), >($1, 1))], joinType=[left])
:- FlinkLogicalCalc(select=[id, len, content], where=[>(id, 1)])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog,
default_database, T]], fields=[id, len, content])
+- FlinkLogicalSnapshot(period=[$cor0.proctime])
+- FlinkLogicalCalc(select=[age, id, name], where=[AND(>(age, 20),
=(name, _UTF-16LE'Fabian':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
+- FlinkLogicalCalc(select=[age, id, name, PROCTIME() AS proctime])
+- FlinkLogicalTableSourceScan(table=[[default_catalog,
default_database, userTable]], fields=[age, id, name])
{code}
The corresponding SQL to reproduce this issue:
{code:java}
CREATE TABLE userTable (
`id` BIGINT,
`len` BIGINT,
`content` STRING,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'values',
'data-id' = '$dataId',
'bounded' = 'true'
)
SELECT T.id, T.len, T.content, D.name FROM T JOIN userTable " +
"for system_time as of T.proctime AS D ON T.id = D.id
{code}
> Improve FlinkCalcMergeRule to merge calc nodes better
> -----------------------------------------------------
>
> Key: FLINK-18548
> URL: https://issues.apache.org/jira/browse/FLINK-18548
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.12.0
> Reporter: Leonard Xu
> Assignee: Leonard Xu
> Priority: Major
> Labels: pull-request-available
>
> In Flink 1.10, we bring computed column feature, but I found dimension table
> do not support this feature.
> public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
> " currency_id BIGINT,\n" +
> " currency_name STRING,\n" +
> " rate DECIMAL(38, 4),\n" +
> " currency_time TIMESTAMP(3),\n" +
> " country STRING,\n" +
> " timestamp6 TIMESTAMP(6),\n" +
> " currency_next as currency_id + 1,\n" +
> " time6 TIME(6),\n" +
> " gdp DECIMAL(10, 4)\n" +
> ") WITH (\n" +
> " 'connector.type' = 'jdbc',\n" +
> " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
> " 'connector.username' = 'root'," +
> " 'connector.table' = 'currency',\n" +
> " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
> " 'connector.lookup.cache.max-rows' = '500', \n" +
> " 'connector.lookup.cache.ttl' = '10s',\n" +
> " 'connector.lookup.max-retries' = '3'" +
> ")";
>
> //
> //
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.calcite.rel.logical.LogicalProject cannot be cast to
> org.apache.calcite.rel.core.TableScanException in thread "main"
> java.lang.ClassCastException: org.apache.calcite.rel.logical.LogicalProject
> cannot be cast to org.apache.calcite.rel.core.TableScan at
> org.apache.calcite.sql2rel.SqlToRelConverter.snapshotTemporalTable(SqlToRelConverter.java:2438)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2062)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2085)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:148)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:523)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:437)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:343)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at job.KafkaJoinJdbc2Jdbc.main(KafkaJoinJdbc2Jdbc.java:59)
> Process finished with exit code 1
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)