[
https://issues.apache.org/jira/browse/FLINK-8256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290846#comment-16290846
]
Ryan Brideau commented on FLINK-8256:
-------------------------------------
Thanks for looking into this so quickly. I managed to track down the root of
the issue on my end. I had built my project previously using the snapshot
archetype, and not the newest 1.4.0 one:
{code:java}
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.4-SNAPSHOT
{code}
To fix the problem I just build a new empty project using the 1.4.0 archetype
version and did a diff of the pom.xml of the two, updating my existing one to
match the new one, and now everything works perfectly. I suspect that anybody
who made a project recently might find themselves in the same situation.
> Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
> --------------------------------------------------------------------------
>
> Key: FLINK-8256
> URL: https://issues.apache.org/jira/browse/FLINK-8256
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.4.0
> Environment: macOS, Local Flink v1.4.0, Scala 2.11
> Reporter: Ryan Brideau
>
> I built the newest release locally today, but when I try to filter a stream
> using an anonymous or named function, I get an error. Here's a simple example:
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
> object TestFunction {
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val params = ParameterTool.fromArgs(args)
> env.getConfig.setGlobalJobParameters(params)
> val someArray = Array(1,2,3)
> val stream = env.fromCollection(someArray).filter(_ => true)
> stream.print().setParallelism(1)
> env.execute("Testing Function")
> }
> }
> {code}
> This results in:
> {code:java}
> Job execution switched to status FAILING.
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.peopleinmotion.TestFunction$$anonfun$1 to field
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type
> scala.Function1 in instance of
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> ... 6 more
> 12/13/2017 15:10:01 Job execution switched to status FAILED.
> ------------------------------------------------------------
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
> at
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
> at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20)
> at org.peopleinmotion.TestFunction.main(TestFunction.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
> at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.peopleinmotion.TestFunction$$anonfun$1 to field
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type
> scala.Function1 in instance of
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> {code}
> However, replacing the function with this results in everything working as
> expected:
> {code:java}
> val stream = env.fromCollection(someArray).filter(new FilterFunction[Int] {
> override def filter(t: Int): Boolean = true
> })
> {code}
> Perhaps something changed in the new build compared to the previous, as this
> was working without issue before?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)