[
https://issues.apache.org/jira/browse/FLINK-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stefano updated FLINK-13944:
----------------------------
Description:
(The project in which I face the error is attached.)
{{Using: Scala streaming API and the StreamTableEnvironment.}}
{{Given the classes:}}
{code:scala}
object EntityType extends Enumeration {
type EntityType = Value
val ACTIVITY = Value
}
sealed trait Entity extends Serializable
case class Activity(card_id: Long, date_time: Timestamp, second: Long,
station_id: Long, station_name: String, activity_code: Long, amount: Long)
extends Entity
{code}
What I try to do to convert a table after selection to an appendStream:
{code:scala}
/** activity table **/
val activityDataStream = partialComputation1
.filter(_._1 == EntityType.ACTIVITY)
.map(x => x._3.asInstanceOf[Activity])
tableEnv.registerDataStream("activity", activityDataStream, 'card_id,
'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)
val selectedTable = tableEnv.scan("activity").select("card_id, second")
selectedTable.printSchema()
// root
// |-- card_id: BIGINT
// |-- second: BIGINT
// ATTEMPT 1
// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)
// output.print
// ATTEMPT 2
// val output = tableEnv.toAppendStream[(java.lang.Long,
java.lang.Long)](selectedTable)
// output.print
// ATTEMPT 3
// val output = tableEnv.toAppendStream[Row](selectedTable)
// output.print
// ATTEMPT 4
case class Test(card_id: Long, second: Long) extends Entity
val output = tableEnv.toAppendStream[Test](selectedTable)
output.print
{code}
In any of the attempts the error I get is always the same:
{code:bash}
$ flink run target/scala-2.11/app-assembly-0.1.jar
Starting execution of program
root
|-- card_id: BIGINT
|-- second: BIGINT
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID:
9954823e0b55a8140f78be6868c85399)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at bds_comparison.flink.metrocard.App$.main(App.scala:141)
at bds_comparison.flink.metrocard.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 23 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program
cannot be compiled. This is a bug. Please file an issue.
at
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at
org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
at
org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 26, Column 21:
Unexpected selector 'package' after "."
at org.codehaus.janino.Parser.compileException(Parser.java:3482)
at org.codehaus.janino.Parser.parseSelector(Parser.java:3147)
at org.codehaus.janino.Parser.parseUnaryExpression(Parser.java:2761)
at
org.codehaus.janino.Parser.parseMultiplicativeExpression(Parser.java:2717)
at org.codehaus.janino.Parser.parseAdditiveExpression(Parser.java:2696)
at org.codehaus.janino.Parser.parseShiftExpression(Parser.java:2675)
at
org.codehaus.janino.Parser.parseRelationalExpression(Parser.java:2599)
at org.codehaus.janino.Parser.parseEqualityExpression(Parser.java:2573)
at org.codehaus.janino.Parser.parseAndExpression(Parser.java:2552)
at
org.codehaus.janino.Parser.parseExclusiveOrExpression(Parser.java:2531)
at
org.codehaus.janino.Parser.parseInclusiveOrExpression(Parser.java:2510)
at
org.codehaus.janino.Parser.parseConditionalAndExpression(Parser.java:2489)
at
org.codehaus.janino.Parser.parseConditionalOrExpression(Parser.java:2468)
at
org.codehaus.janino.Parser.parseConditionalExpression(Parser.java:2449)
at
org.codehaus.janino.Parser.parseAssignmentExpression(Parser.java:2428)
at org.codehaus.janino.Parser.parseExpression(Parser.java:2413)
at org.codehaus.janino.Parser.parseBlockStatement(Parser.java:1611)
at org.codehaus.janino.Parser.parseBlockStatements(Parser.java:1544)
at
org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1381)
at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:834)
at org.codehaus.janino.Parser.parseClassBody(Parser.java:732)
at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:638)
at
org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:366)
at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:237)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
... 10 more
{code}
was:
{{Using: Scala streaming API and the StreamTableEnvironment.}}
{{Given the classes:}}
{code:scala}
object EntityType extends Enumeration {
type EntityType = Value
val ACTIVITY = Value
}
sealed trait Entity extends Serializable
case class Activity(card_id: Long, date_time: Timestamp, second: Long,
station_id: Long, station_name: String, activity_code: Long, amount: Long)
extends Entity
{code}
What I try to do is\{{ to convert a table after selection to an appendStream.}}
{{/** activity table **/}}
{{val activityDataStream = partialComputation1}}
\{{ .filter(_._1 == EntityType.ACTIVITY)}}
{{ .map(x => x._3.asInstanceOf[Activity])}}
{{tableEnv.registerDataStream("activity", activityDataStream, 'card_id,
'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}}
{{val selectedTable = tableEnv.scan("activity").select("card_id, second")}}
{{selectedTable.printSchema()}}
{{// root}}
{{// |-- card_id: BIGINT}}
{{// |-- second: BIGINT}}
{{// ATTEMPT 1}}
{{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}}
{{// output.print}}
{{// ATTEMPT 2}}
{{// val output = tableEnv.toAppendStream[(java.lang.Long,
java.lang.Long)](selectedTable)}}
{{// output.print}}
{{// ATTEMPT 3}}
{{// val output = tableEnv.toAppendStream[Row](selectedTable)}}
{{// output.print}}
{{// ATTEMPT 4}}
{{case class Test(card_id: Long, second: Long) extends Entity}}{{val output =
tableEnv.toAppendStream[Test](selectedTable)}}
{{output.print}}
The result for each of the attempts is always the same:
{{------------------------------------------- The program finished with the
following exception:}}
{{org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 334fe364c516008ca34b76e27c5c6f79) at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at
... 23 more
Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program
cannot be compiled. This is a bug. Please file an issue.* at
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at
org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
at
org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at
java.lang.Thread.run(Thread.java:748)}}
My project in which I face the error is attached.
> Table.toAppendStream: InvalidProgramException: Table program cannot be
> compiled.
> --------------------------------------------------------------------------------
>
> Key: FLINK-13944
> URL: https://issues.apache.org/jira/browse/FLINK-13944
> Project: Flink
> Issue Type: Bug
> Components: API / Scala, Table SQL / API
> Affects Versions: 1.8.1, 1.9.0
> Environment: {{$ java -version}}
> {{ openjdk version "1.8.0_222"}}
> {{ OpenJDK Runtime Environment (build
> 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10)}}
> {{ OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)}}
> {{------}}
> {{$ scala -version}}
> {{Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL}}
> {{------}}
> {{build.}}{{sbt}}
> [...]
> ThisBuild / scalaVersion := "2.11.12"
> val flinkVersion = "1.9.0"
> val flinkDependencies = Seq(
> "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
> "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided")
> [...]
>
> Reporter: Stefano
> Priority: Major
> Attachments: app.zip
>
>
> (The project in which I face the error is attached.)
> {{Using: Scala streaming API and the StreamTableEnvironment.}}
> {{Given the classes:}}
> {code:scala}
> object EntityType extends Enumeration {
> type EntityType = Value
> val ACTIVITY = Value
> }
> sealed trait Entity extends Serializable
> case class Activity(card_id: Long, date_time: Timestamp, second: Long,
> station_id: Long, station_name: String, activity_code: Long, amount: Long)
> extends Entity
> {code}
> What I try to do to convert a table after selection to an appendStream:
> {code:scala}
> /** activity table **/
> val activityDataStream = partialComputation1
> .filter(_._1 == EntityType.ACTIVITY)
> .map(x => x._3.asInstanceOf[Activity])
> tableEnv.registerDataStream("activity", activityDataStream, 'card_id,
> 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)
> val selectedTable = tableEnv.scan("activity").select("card_id, second")
> selectedTable.printSchema()
> // root
> // |-- card_id: BIGINT
> // |-- second: BIGINT
> // ATTEMPT 1
> // val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)
> // output.print
> // ATTEMPT 2
> // val output = tableEnv.toAppendStream[(java.lang.Long,
> java.lang.Long)](selectedTable)
> // output.print
> // ATTEMPT 3
> // val output = tableEnv.toAppendStream[Row](selectedTable)
> // output.print
> // ATTEMPT 4
> case class Test(card_id: Long, second: Long) extends Entity
> val output = tableEnv.toAppendStream[Test](selectedTable)
> output.print
> {code}
> In any of the attempts the error I get is always the same:
> {code:bash}
> $ flink run target/scala-2.11/app-assembly-0.1.jar
> Starting execution of program
> root
> |-- card_id: BIGINT
> |-- second: BIGINT
> ------------------------------------------------------------
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 9954823e0b55a8140f78be6868c85399)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> at bds_comparison.flink.metrocard.App$.main(App.scala:141)
> at bds_comparison.flink.metrocard.App.main(App.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 23 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program
> cannot be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
> at
> org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
> at
> org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 26, Column
> 21: Unexpected selector 'package' after "."
> at org.codehaus.janino.Parser.compileException(Parser.java:3482)
> at org.codehaus.janino.Parser.parseSelector(Parser.java:3147)
> at org.codehaus.janino.Parser.parseUnaryExpression(Parser.java:2761)
> at
> org.codehaus.janino.Parser.parseMultiplicativeExpression(Parser.java:2717)
> at org.codehaus.janino.Parser.parseAdditiveExpression(Parser.java:2696)
> at org.codehaus.janino.Parser.parseShiftExpression(Parser.java:2675)
> at
> org.codehaus.janino.Parser.parseRelationalExpression(Parser.java:2599)
> at org.codehaus.janino.Parser.parseEqualityExpression(Parser.java:2573)
> at org.codehaus.janino.Parser.parseAndExpression(Parser.java:2552)
> at
> org.codehaus.janino.Parser.parseExclusiveOrExpression(Parser.java:2531)
> at
> org.codehaus.janino.Parser.parseInclusiveOrExpression(Parser.java:2510)
> at
> org.codehaus.janino.Parser.parseConditionalAndExpression(Parser.java:2489)
> at
> org.codehaus.janino.Parser.parseConditionalOrExpression(Parser.java:2468)
> at
> org.codehaus.janino.Parser.parseConditionalExpression(Parser.java:2449)
> at
> org.codehaus.janino.Parser.parseAssignmentExpression(Parser.java:2428)
> at org.codehaus.janino.Parser.parseExpression(Parser.java:2413)
> at org.codehaus.janino.Parser.parseBlockStatement(Parser.java:1611)
> at org.codehaus.janino.Parser.parseBlockStatements(Parser.java:1544)
> at
> org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1381)
> at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:834)
> at org.codehaus.janino.Parser.parseClassBody(Parser.java:732)
> at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:638)
> at
> org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:366)
> at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:237)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
> ... 10 more
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.2#803003)