@Shivam and @Ryan:

My first feeling would be the following: You have the Scala library in your
user code, and thus through the reversed class loading, the scala function
types get duplicated.

The right way to fix that is to make sure you build a proper jar file
without any provided dependencies. Make sure you set "-Pbuild-jar" when
packaging your program.

  - You could also set "classloader.resolve-order: parent-first" in your
configuration to restore the old class loading style.

  - We should add "scala" to the default value for
"classloader.parent-first-patterns". You can add it yourself in the
configuration (make sure you keep all existing parent-first-patterns as
well).



On Thu, Dec 14, 2017 at 12:14 PM, Till Rohrmann <trohrm...@apache.org>
wrote:

> Hi,
>
> I tried to reproduce the problem given your description Ryan. I submitted
> the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version
> downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from
> flink.apache.org and a cluster built from sources). However, I was not
> able
> to reproduce the problem. Therefore I suspect that it has something to do
> with your or my setup.
>
> In order to further diagnose the problem, it would be tremendously helpful
> if you could share the logs contained in the logs directory with us.
>
> Cheers,
> Till
>
> On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <se...@apache.org> wrote:
>
> > Hi!
> >
> > This may be due to the changed classloading semantics.
> >
> > Just to verify this, can you check if it gets solved by setting the
> > following in the Flink configuration: "classloader.resolve-order: parent-
> > first"
> >
> > By default, Flink 1.4 uses now inverted classloading to allow users to
> use
> > their own copies of dependencies, irrespective of what the underlying
> > classpath is spoiled with. You can for example use a different Avro
> > versions than Hadoop pull in, even without shading, or even different
> Akka
> > / Jackson / etc versions.
> >
> > That is a nice improvement, but it may have some impacts on tools that
> have
> > been build before. When you see classcast exceptions (like X cannot be
> cast
> > to X), that is probably caused by the fact that the classloader
> duplicates
> > a dependency from the JVM classpath in user-space, but objects/classes
> move
> > between the domains.
> >
> > Stephan
> >
> > On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <28shivamsha...@gmail.com
> >
> > wrote:
> >
> > > Same Issue I am facing :-
> > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> > > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html
> > >
> > > Can anyone explain the exception
> > >
> > > Thanks
> > >
> > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <j...@apache.org>
> > > wrote:
> > >
> > > > Ryan Brideau created FLINK-8256:
> > > > -----------------------------------
> > > >
> > > >              Summary: Cannot use Scala functions to filter in 1.4 -
> > > > java.lang.ClassCastException
> > > >                  Key: FLINK-8256
> > > >                  URL: https://issues.apache.org/
> jira/browse/FLINK-8256
> > > >              Project: Flink
> > > >           Issue Type: Bug
> > > >           Components: DataStream API
> > > >     Affects Versions: 1.4.0
> > > >          Environment: macOS, Local Flink v1.4.0, Scala 2.11
> > > >             Reporter: Ryan Brideau
> > > >
> > > >
> > > > I built the newest release locally today, but when I try to filter a
> > > > stream using an anonymous or named function, I get an error. Here's a
> > > > simple example:
> > > >
> > > >
> > > > {code:java}
> > > > import org.apache.flink.api.java.utils.ParameterTool
> > > > import org.apache.flink.streaming.api.scala._
> > > >
> > > > object TestFunction {
> > > >
> > > >   def main(args: Array[String]): Unit = {
> > > >
> > > >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> > > >     val params = ParameterTool.fromArgs(args)
> > > >     env.getConfig.setGlobalJobParameters(params)
> > > >
> > > >     val someArray = Array(1,2,3)
> > > >     val stream = env.fromCollection(someArray).filter(_ => true)
> > > >     stream.print().setParallelism(1)
> > > >     env.execute("Testing Function")
> > > >   }
> > > > }
> > > > {code}
> > > >
> > > > This results in:
> > > >
> > > >
> > > > {code:java}
> > > > Job execution switched to status FAILING.
> > > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> > > > instantiate user function.
> > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > getStreamOperator(StreamConfig.java:235)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > createChainedOperator(OperatorChain.java:355)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > createOutputCollector(OperatorChain.java:282)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > > > init>(OperatorChain.java:126)
> > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > invoke(StreamTask.java:231)
> > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > java:718)
> > > >         at java.lang.Thread.run(Thread.java:748)
> > > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6
> of
> > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > api.scala.DataStream$$anon$7
> > > >         at java.io.ObjectStreamClass$FieldReflector.
> setObjFieldValues(
> > > > ObjectStreamClass.java:2233)
> > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > ObjectStreamClass.java:1405)
> > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > ObjectInputStream.java:2288)
> > > >         at java.io.ObjectInputStream.readSerialData(
> > > > ObjectInputStream.java:2206)
> > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > ObjectInputStream.java:2064)
> > > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > > java:1568)
> > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > ObjectInputStream.java:2282)
> > > >         at java.io.ObjectInputStream.readSerialData(
> > > > ObjectInputStream.java:2206)
> > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > ObjectInputStream.java:2064)
> > > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > > java:1568)
> > > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > > java:428)
> > > >         at org.apache.flink.util.InstantiationUtil.
> deserializeObject(
> > > > InstantiationUtil.java:290)
> > > >         at org.apache.flink.util.InstantiationUtil.
> > readObjectFromConfig(
> > > > InstantiationUtil.java:248)
> > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > getStreamOperator(StreamConfig.java:220)
> > > >         ... 6 more
> > > > 12/13/2017 15:10:01     Job execution switched to status FAILED.
> > > >
> > > > ------------------------------------------------------------
> > > >  The program finished with the following exception:
> > > >
> > > > org.apache.flink.client.program.ProgramInvocationException: The
> > program
> > > > execution failed: Job execution failed.
> > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > ClusterClient.java:492)
> > > >         at org.apache.flink.client.program.StandaloneClusterClient.
> > > > submitJob(StandaloneClusterClient.java:105)
> > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > ClusterClient.java:456)
> > > >         at org.apache.flink.streaming.api.environment.
> > > > StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> > > >         at org.apache.flink.streaming.api.scala.
> > > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.
> > scala:638)
> > > >         at org.peopleinmotion.TestFunction$.main(
> > TestFunction.scala:20)
> > > >         at org.peopleinmotion.TestFunction.main(TestFunction.scala)
> > > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> > > >         at sun.reflect.NativeMethodAccessorImpl.invoke(
> > > > NativeMethodAccessorImpl.java:62)
> > > >         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > > DelegatingMethodAccessorImpl.java:43)
> > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > > >         at org.apache.flink.client.program.PackagedProgram.
> > callMainMeth
> > > od(
> > > > PackagedProgram.java:525)
> > > >         at org.apache.flink.client.program.PackagedProgram.
> > > > invokeInteractiveModeForExecution(PackagedProgram.java:417)
> > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > ClusterClient.java:396)
> > > >         at org.apache.flink.client.CliFrontend.executeProgram(
> > > > CliFrontend.java:802)
> > > >         at org.apache.flink.client.CliFrontend.run(CliFrontend.
> > java:282)
> > > >         at org.apache.flink.client.CliFrontend.parseParameters(
> > > > CliFrontend.java:1054)
> > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > CliFrontend.java:1101)
> > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > CliFrontend.java:1098)
> > > >         at java.security.AccessController.doPrivileged(Native
> Method)
> > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >         at org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > UserGroupInformation.java:1556)
> > > >         at org.apache.flink.runtime.security.HadoopSecurityContext.
> > > > runSecured(HadoopSecurityContext.java:41)
> > > >         at org.apache.flink.client.CliFrontend.main(CliFrontend.
> java:
> > > 1098)
> > > > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Job
> > > > execution failed.
> > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> > > > mcV$sp(JobManager.scala:897)
> > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > ger.scala:840)
> > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > ger.scala:840)
> > > >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > > > liftedTree1$1(Future.scala:24)
> > > >         at scala.concurrent.impl.Future$
> PromiseCompletingRunnable.run(
> > > > Future.scala:24)
> > > >         at akka.dispatch.TaskInvocation.
> run(AbstractDispatcher.scala:
> > 39)
> > > >         at akka.dispatch.ForkJoinExecutorConfigurator$
> > > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> > > >         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> > > > ForkJoinTask.java:260)
> > > >         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > > > runTask(ForkJoinPool.java:1339)
> > > >         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > > > ForkJoinPool.java:1979)
> > > >         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > > > ForkJoinWorkerThread.java:107)
> > > > Caused by: org.apache.flink.streaming.runtime.tasks.
> > StreamTaskException:
> > > > Cannot instantiate user function.
> > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > getStreamOperator(StreamConfig.java:235)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > createChainedOperator(OperatorChain.java:355)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > createOutputCollector(OperatorChain.java:282)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > > > init>(OperatorChain.java:126)
> > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > invoke(StreamTask.java:231)
> > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > java:718)
> > > >         at java.lang.Thread.run(Thread.java:748)
> > > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6
> of
> > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > api.scala.DataStream$$anon$7
> > > >         at java.io.ObjectStreamClass$FieldReflector.
> setObjFieldValues(
> > > > ObjectStreamClass.java:2233)
> > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > ObjectStreamClass.java:1405)
> > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > ObjectInputStream.java:2288)
> > > >         at java.io.ObjectInputStream.readSerialData(
> > > > ObjectInputStream.java:2206)
> > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > ObjectInputStream.java:2064)
> > > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > > java:1568)
> > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > ObjectInputStream.java:2282)
> > > >         at java.io.ObjectInputStream.readSerialData(
> > > > ObjectInputStream.java:2206)
> > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > ObjectInputStream.java:2064)
> > > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > > java:1568)
> > > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > > java:428)
> > > >         at org.apache.flink.util.InstantiationUtil.
> deserializeObject(
> > > > InstantiationUtil.java:290)
> > > >         at org.apache.flink.util.InstantiationUtil.
> > readObjectFromConfig(
> > > > InstantiationUtil.java:248)
> > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > getStreamOperator(StreamConfig.java:220)
> > > >
> > > > {code}
> > > >
> > > > However, replacing the function with this results in everything
> working
> > > as
> > > > expected:
> > > >
> > > > {code:java}
> > > > val stream = env.fromCollection(someArray).filter(new
> > > FilterFunction[Int]
> > > > {
> > > >       override def filter(t: Int): Boolean = true
> > > >     })
> > > > {code}
> > > >
> > > > Perhaps something changed in the new build compared to the previous,
> as
> > > > this was working without issue before?
> > > >
> > > >
> > > >
> > > > --
> > > > This message was sent by Atlassian JIRA
> > > > (v6.4.14#64029)
> > > >
> > >
> > >
> > >
> > > --
> > > Shivam Sharma
> > > Data Engineer @ Goibibo
> > > Indian Institute Of Information Technology, Design and Manufacturing
> > > Jabalpur
> > > Mobile No- (+91) 8882114744
> > > Email:- 28shivamsha...@gmail.com
> > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> > > <https://www.linkedin.com/in/28shivamsharma>*
> > >
> >
>

Reply via email to