[
https://issues.apache.org/jira/browse/SPARK-29497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749003#comment-17749003
]
Herman van Hövell commented on SPARK-29497:
-------------------------------------------
This all seems to boil down to the fact that you cannot deserialize an object
graph if this contains a self reference and that self-reference is a proxy. In
this case readResolve is never called (because its inputs are not fully
deserialized), and you get a ClassCastException because you are trying to
assign a proxy to the real object. In this particular case SerializedLambda is
a serialization proxy. See
[this|https://docs.oracle.com/en/java/javase/17/docs/specs/serialization/input.html#the-readresolve-method]
for more information.
All of the examples mentioned here contain such a self reference, in the form
of lamdba -> parent -> lambda. An even simpler example would be the following:
{code:scala}
case class SelfRef(start: Int) extends Serializable {
val method: Int => Int = (i: Int) => i + start
}
...
SparkSerDeUtils.deserialize[Int =>
Int](SparkSerDeUtils.serialize(SelfRef(43).method)) // KABOOM
{code}
As far as mitigations go there is not much we can do besides improving error
handling (hard because the observed exception also masks dependency problems),
and writing down which patterns to avoid.
> Cannot assign instance of java.lang.invoke.SerializedLambda to field
> --------------------------------------------------------------------
>
> Key: SPARK-29497
> URL: https://issues.apache.org/jira/browse/SPARK-29497
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.3, 3.0.1, 3.2.0
> Environment: Spark 2.4.3 Scala 2.12
> Spark 3.2.0 Scala 2.13.5 (Java 11.0.12)
> Reporter: Rob Russo
> Priority: Major
>
> Note this is for scala 2.12:
> There seems to be an issue in spark with serializing a udf that is created
> from a function assigned to a class member that references another function
> assigned to a class member. This is similar to
> https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the
> resolution has an issue with this case. After trimming it down to the base
> issue I came up with the following to reproduce:
>
>
> {code:java}
> object TestLambdaShell extends Serializable {
> val hello: String => String = s => s"hello $s!"
> val lambdaTest: String => String = hello( _ )
> def functionTest: String => String = hello( _ )
> }
> val hello = udf( TestLambdaShell.hello )
> val functionTest = udf( TestLambdaShell.functionTest )
> val lambdaTest = udf( TestLambdaShell.lambdaTest )
> sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1)
> sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1)
> sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1)
> {code}
>
> All of which works except the last line which results in an exception on the
> executors:
>
> {code:java}
> Caused by: java.lang.ClassCastException: cannot assign instance of
> java.lang.invoke.SerializedLambda to field
> $$$82b5b23cea489b2712a1db46c77e458$$$$w$TestLambdaShell$.lambdaTest of type
> scala.Function1 in instance of
> $$$82b5b23cea489b2712a1db46c77e458$$$$w$TestLambdaShell$
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
> at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> at
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> at
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>
> In spark 2.2.x I used a class that had something like this that worked fine,
> now that we've upgraded to 2.12 we ran into a few serialization issues in
> places, most of which were solved by extending serializable but this case was
> not fixed by that.
>
> Also this happens regardless of whether it's done in the shell or in a jar.
>
>
>
> So after much more debugging, this turns out to be some weird mix of scala
> 2.12.0 and scala 2.12.8. Spark is compiled on 2.12.8 and so is our own code
> but I noticed that the maven compiled class did not match the compiled class
> using 2.12.8 scalac directly. After a lot of digging we realized that
> scala-compiler actually indirectly depends on scala library 2.12.0 and only
> when the spark dependency is added does it start using it for some reason.
> Without the spark dependency and just direct scala 2.12.8 dependencies, the
> code builds fine and compiles correctly as 2.12.8.
>
> We were able to fix this using:
>
> {code:java}
> <failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
> <scalaCompatVersion>2.12</scalaCompatVersion>
> <scalaVersion>2.12.8</scalaVersion>
> {code}
>
> And this resolves our issue for our own jars that we create and link to
> spark. However, my original test case still seems to reproduce in the spark
> shell and for us also in apache zeppelin so it seems almost like somehow they
> are also compiling it on 2.12.0 but I'm not quite sure how. In the spark
> pom.xml it seems to have the fail on multiple versions and compiles fine so
> I'm not quite sure how this is happening but at least its more isolated now.
> I'm also wondering if anything else could be affected by this.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]