[
https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15388074#comment-15388074
]
Till Rohrmann commented on FLINK-4250:
--------------------------------------
The problem seems to be more general than the {{CsvTableSource}}. The following
code also fails:
{code}
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val csvFilePath = "table-jobs/src/main/resources/input.csv"
val tblEnv = TableEnvironment.getTableEnvironment(env)
val inputDS = env.fromElements((1, "foo", 1.0, "12"))
tblEnv.registerDataSet("foobar", inputDS, 'key, 'user, 'value, 'timestamp)
val input = tblEnv.sql("SELECT value FROM foobar")
tblEnv.toDataSet[Row](input).print()
}
{code}
But this time it fails with
{code}
Exception in thread "main" org.apache.calcite.sql.parser.SqlParseException:
Encountered "value" at line 1, column 8.
Was expecting one of:
...
{code}
> Cannot select column from CsvTableSource
> ----------------------------------------
>
> Key: FLINK-4250
> URL: https://issues.apache.org/jira/browse/FLINK-4250
> Project: Flink
> Issue Type: Bug
> Components: Scala API, Table API & SQL
> Affects Versions: 1.1.0
> Reporter: Till Rohrmann
>
> Using the Scala Table API and the {{CsvTableSource}} I cannot select a column
> from the csv source. The following code:
> {code}
> package com.dataartisans.batch
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
> import org.apache.flink.api.scala._
> import org.apache.flink.api.table.sources.CsvTableSource
> import org.apache.flink.api.table.{Row, TableEnvironment, Table}
> object CsvTableAPIJob {
> def main(args: Array[String]): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val csvFilePath = "table-jobs/src/main/resources/input.csv"
> val tblEnv = TableEnvironment.getTableEnvironment(env)
> val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value",
> "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO))
> tblEnv.registerTableSource("foobar", csvTS)
> val input = tblEnv.sql("SELECT user FROM foobar")
> tblEnv.toDataSet[Row](input).print()
> }
> }
> {code}
> fails with
> {code}
> Exception in thread "main"
> org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER
> at
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
> at
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782)
> at
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
> at
> org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168)
> at
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
> at
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286)
> at
> org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
> at
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
> at
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
> at
> org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271)
> at
> org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
> at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21)
> at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)