Stefano created FLINK-13944:
-------------------------------
Summary: Table.toAppendStream: InvalidProgramException: Table
program cannot be compiled.
Key: FLINK-13944
URL: https://issues.apache.org/jira/browse/FLINK-13944
Project: Flink
Issue Type: Bug
Components: API / Scala, Table SQL / API
Affects Versions: 1.9.0, 1.8.1
Environment: {{$ java -version}}
{{ openjdk version "1.8.0_222"}}
{{ OpenJDK Runtime Environment (build
1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10)}}
{{ OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)}}
{{------}}
{{$ scala -version}}
{{Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL}}
{{------}}
{{build.}}{{sbt}}
[...]
ThisBuild / scalaVersion := "2.11.12"
val flinkVersion = "1.9.0"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided")
[...]
Reporter: Stefano
Attachments: app.zip
{{Using: Scala streaming API and the StreamTableEnvironment.}}
Given the classes:
{{object EntityType extends Enumeration {}}
{{ type EntityType = Value}}
{{ val ACTIVITY = Value}}
{{}}}
{{sealed trait Entity extends Serializable}}
{{case class Activity(card_id: Long, date_time: Timestamp, second: Long,
station_id: Long, station_name: String, activity_code: Long, amount: Long)
extends Entity}}
What I try to do is{{ to convert a table after selection to an appendStream.}}
{{/** activity table **/}}
{{val activityDataStream = partialComputation1}}
{{ .filter(_._1 == EntityType.ACTIVITY)}}
{{ .map(x => x._3.asInstanceOf[Activity])}}
{{tableEnv.registerDataStream("activity", activityDataStream, 'card_id,
'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}}
{{val selectedTable = tableEnv.scan("activity").select("card_id, second")}}
{{selectedTable.printSchema()}}
{{// root}}
{{// |-- card_id: BIGINT}}
{{// |-- second: BIGINT}}
{{// ATTEMPT 1}}
{{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}}
{{// output.print}}
{{// ATTEMPT 2}}
{{// val output = tableEnv.toAppendStream[(java.lang.Long,
java.lang.Long)](selectedTable)}}
{{// output.print}}
{{// ATTEMPT 3}}
{{// val output = tableEnv.toAppendStream[Row](selectedTable)}}
{{// output.print}}
{{// ATTEMPT 4}}
{{case class Test(card_id: Long, second: Long) extends Entity}}{{val output =
tableEnv.toAppendStream[Test](selectedTable)}}
{{output.print}}
The result for each of the attempts is always the same:
{{------------------------------------------- The program finished with the
following exception:}}
{{org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 334fe364c516008ca34b76e27c5c6f79) at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at
... 23 more
Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program
cannot be compiled. This is a bug. Please file an issue.* at
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at
org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
at
org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at
java.lang.Thread.run(Thread.java:748)}}
My project in which I face the error is attached.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)