[ 
https://issues.apache.org/jira/browse/FLINK-16068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037541#comment-17037541
 ] 

Benchao Li commented on FLINK-16068:
------------------------------------

[~pangliang] Thanks for reporting this, and your nice example sql which can be 
easily reproduced.

[~jark] After looking into the stack trace and source code, I found it's 
because in {{CatalogSourceTable}}:L103, we use field names and computed column 
expressions to construct a new expr list, and we simply use field name as the 
expression for non computed column.

IMO, a simple fix here is we can add escape character around the field name. 
(actually, in our internal computed column implementation, we encountered this 
problem as well, and resolved it this way).

> table with keyword-escaped columns and computed_column_expression columns
> -------------------------------------------------------------------------
>
>                 Key: FLINK-16068
>                 URL: https://issues.apache.org/jira/browse/FLINK-16068
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client
>    Affects Versions: 1.10.0
>            Reporter: pangliang
>            Priority: Major
>             Fix For: 1.10.1
>
>
> I use sql-client to create a table with keyword-escaped column and 
> computed_column_expression column, like this:
> {code:java}
> CREATE TABLE source_kafka (
>     log STRING,
>     `time` BIGINT,
>     pt as proctime()
> ) WITH (
>   'connector.type' = 'kafka',       
>   'connector.version' = 'universal',
>   'connector.topic' = 'k8s-logs',
>   'connector.startup-mode' = 'latest-offset',
>   'connector.properties.zookeeper.connect' = 
> 'zk-1.zk:2181,zk-2.zk:2181,zk-3.zk:2181/kafka',
>   'connector.properties.bootstrap.servers' = 'kafka.default:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'format.type'='json',
>   'format.fail-on-missing-field' = 'true',
>   'update-mode' = 'append'
> );
> {code}
> Then I simply used it :
> {code:java}
> SELECT * from source_kafka limit 10;{code}
> got an exception:
> {code:java}
> java.io.IOException: Fail to run stream sql job
>       at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:164)
>       at 
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callSelect(FlinkStreamSqlInterpreter.java:108)
>       at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:203)
>       at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
>       at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:104)
>       at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103)
>       at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:676)
>       at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:569)
>       at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
>       at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:121)
>       at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
> Encountered "time" at line 1, column 12.
> Was expecting one of:
>     "ABS" ...
>     "ARRAY" ...
>     "AVG" ...
>     "CARDINALITY" ...
>     "CASE" ...
>     "CAST" ...
>     "CEIL" ...
>     "CEILING" ...
>     ......
>     
>       at 
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>       at 
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>       at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>       at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>       at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>       at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>       at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>       at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>       at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>       at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:104)
>       ... 13 more
> {code}
> I also did some tests, the following can run:
> {code:java}
> CREATE TABLE source_kafka (
>     log STRING,
>     `aaaaa` BIGINT,
>     pt as proctime()
> )
> CREATE TABLE source_kafka (
>     log STRING,
>     `time` BIGINT
> )
> CREATE TABLE source_kafka (
>     log STRING,
>     pt as proctime()
> ){code}
> can not run:
> `time` , `select`, `string`
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to