[
https://issues.apache.org/jira/browse/FLINK-8256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290720#comment-16290720
]
Till Rohrmann commented on FLINK-8256:
--------------------------------------
Thanks for reporting the issue [~Brideau]. I tried to reproduce the issue by
taking the example program and submitting it to a vanilla Flink 1.4.0 cluster
(Hadoop free version downloaded from flink.apache.org, Hadoop 2.7 version
downloaded from flink.apache.org and version built from sources).
Unfortunately, I was not able to reproduce the problem. Therefore I suspect
that it has something to do with either your or my setup.
Could you maybe share all the logs with us?
How did you build the job? Did you use the Flink quickstarts?
> Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
> --------------------------------------------------------------------------
>
> Key: FLINK-8256
> URL: https://issues.apache.org/jira/browse/FLINK-8256
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.4.0
> Environment: macOS, Local Flink v1.4.0, Scala 2.11
> Reporter: Ryan Brideau
>
> I built the newest release locally today, but when I try to filter a stream
> using an anonymous or named function, I get an error. Here's a simple example:
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
> object TestFunction {
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val params = ParameterTool.fromArgs(args)
> env.getConfig.setGlobalJobParameters(params)
> val someArray = Array(1,2,3)
> val stream = env.fromCollection(someArray).filter(_ => true)
> stream.print().setParallelism(1)
> env.execute("Testing Function")
> }
> }
> {code}
> This results in:
> {code:java}
> Job execution switched to status FAILING.
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.peopleinmotion.TestFunction$$anonfun$1 to field
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type
> scala.Function1 in instance of
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> ... 6 more
> 12/13/2017 15:10:01 Job execution switched to status FAILED.
> ------------------------------------------------------------
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
> at
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
> at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20)
> at org.peopleinmotion.TestFunction.main(TestFunction.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
> at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.peopleinmotion.TestFunction$$anonfun$1 to field
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type
> scala.Function1 in instance of
> org.apache.flink.streaming.api.scala.DataStream$$anon$7
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> at
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> {code}
> However, replacing the function with this results in everything working as
> expected:
> {code:java}
> val stream = env.fromCollection(someArray).filter(new FilterFunction[Int] {
> override def filter(t: Int): Boolean = true
> })
> {code}
> Perhaps something changed in the new build compared to the previous, as this
> was working without issue before?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)