[
https://issues.apache.org/jira/browse/SPARK-29497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498239#comment-17498239
]
Jay Sen edited comment on SPARK-29497 at 2/25/22, 6:14 PM:
-----------------------------------------------------------
some more verifying and additional info.
i was able to reproduce this on both spark-3.2.1 (scala 2.12) and spark-3.2.1
(scala 2.13)) - hadoop3.2 on shell as well as via java code.
local scala and java version:
{code:java}
❯ scala -version
Scala code runner version 2.12.14 -- Copyright 2002-2021, LAMP/EPFL and
Lightbend, Inc.
❯ java --version
openjdk 11.0.12 2021-07-20{code}
In my case, I am using map ( same with mapParttion also)
Example: simple Function with dummy reducer:
{code:java}
JavaRDD<Object> rdd =
data.toJavaRDD().map(
new Function<Row, Object>() {
@Override
public Object call(Row v1) throws Exception
{ return v1 !=null; }
}
);
Object result = rdd.reduce(LocalClass::reduceDummy);
public static Object reduceDummy(Object a, Object b)
{ return null; }
{code}
Error:
{code:java}
Caused by: java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of
org.apache.spark.rdd.MapPartitionsRDD
{code}
Stacktrace:
{code:java}
Caused by: java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of
org.apache.spark.rdd.MapPartitionsRDD
at
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2205)
at
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2168)
at
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1422)
Caused by: java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of
org.apache.spark.rdd.MapPartitionsRDD
at
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2480)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2387)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
{code}
*It works fine via spark test framework, but not with spark standalone:*
com.holdenkarau:spark-testing-base_2.12
was (Author: jaysen):
some more verifying and additional info.
i was able to reproduce this on both spark-3.2.1 (scala 2.12) and spark-3.2.1
(scala 2.13)) - hadoop3.2 on shell as well as via java code.
local scala and java version:
{code:java}
❯ scala -version
Scala code runner version 2.12.14 -- Copyright 2002-2021, LAMP/EPFL and
Lightbend, Inc.
❯ java --version
openjdk 11.0.12 2021-07-20{code}
In my case, I am using mapParttion
Example:
{code:java}
JavaRDD<Object> rdd =
data.toJavaRDD().map(
new Function<Row, Object>() {
@Override
public Object call(Row v1) throws Exception
{ return v1 !=null; }
}
);
Object result = rdd.reduce(LocalClass::reduceDummy);
public static Object reduceDummy(Object a, Object b)
{ return null; }
{code}
with simple dummy reducer:
Error:
{code:java}
Caused by: java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of
org.apache.spark.rdd.MapPartitionsRDD
{code}
Stacktrace:
{code:java}
Caused by: java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of
org.apache.spark.rdd.MapPartitionsRDD
at
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2205)
at
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2168)
at
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1422)
Caused by: java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of
org.apache.spark.rdd.MapPartitionsRDD
at
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2480)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2387)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
{code}
If i run this via test package: com.holdenkarau:spark-testing-base_2.12, it
works fine.
> Cannot assign instance of java.lang.invoke.SerializedLambda to field
> --------------------------------------------------------------------
>
> Key: SPARK-29497
> URL: https://issues.apache.org/jira/browse/SPARK-29497
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.3, 3.0.1, 3.2.0
> Environment: Spark 2.4.3 Scala 2.12
> Spark 3.2.0 Scala 2.13.5 (Java 11.0.12)
> Reporter: Rob Russo
> Priority: Major
>
> Note this is for scala 2.12:
> There seems to be an issue in spark with serializing a udf that is created
> from a function assigned to a class member that references another function
> assigned to a class member. This is similar to
> https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the
> resolution has an issue with this case. After trimming it down to the base
> issue I came up with the following to reproduce:
>
>
> {code:java}
> object TestLambdaShell extends Serializable {
> val hello: String => String = s => s"hello $s!"
> val lambdaTest: String => String = hello( _ )
> def functionTest: String => String = hello( _ )
> }
> val hello = udf( TestLambdaShell.hello )
> val functionTest = udf( TestLambdaShell.functionTest )
> val lambdaTest = udf( TestLambdaShell.lambdaTest )
> sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1)
> sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1)
> sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1)
> {code}
>
> All of which works except the last line which results in an exception on the
> executors:
>
> {code:java}
> Caused by: java.lang.ClassCastException: cannot assign instance of
> java.lang.invoke.SerializedLambda to field
> $$$82b5b23cea489b2712a1db46c77e458$$$$w$TestLambdaShell$.lambdaTest of type
> scala.Function1 in instance of
> $$$82b5b23cea489b2712a1db46c77e458$$$$w$TestLambdaShell$
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
> at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> at
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> at
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>
> In spark 2.2.x I used a class that had something like this that worked fine,
> now that we've upgraded to 2.12 we ran into a few serialization issues in
> places, most of which were solved by extending serializable but this case was
> not fixed by that.
>
> Also this happens regardless of whether it's done in the shell or in a jar.
>
>
>
> So after much more debugging, this turns out to be some weird mix of scala
> 2.12.0 and scala 2.12.8. Spark is compiled on 2.12.8 and so is our own code
> but I noticed that the maven compiled class did not match the compiled class
> using 2.12.8 scalac directly. After a lot of digging we realized that
> scala-compiler actually indirectly depends on scala library 2.12.0 and only
> when the spark dependency is added does it start using it for some reason.
> Without the spark dependency and just direct scala 2.12.8 dependencies, the
> code builds fine and compiles correctly as 2.12.8.
>
> We were able to fix this using:
>
> {code:java}
> <failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
> <scalaCompatVersion>2.12</scalaCompatVersion>
> <scalaVersion>2.12.8</scalaVersion>
> {code}
>
> And this resolves our issue for our own jars that we create and link to
> spark. However, my original test case still seems to reproduce in the spark
> shell and for us also in apache zeppelin so it seems almost like somehow they
> are also compiling it on 2.12.0 but I'm not quite sure how. In the spark
> pom.xml it seems to have the fail on multiple versions and compiles fine so
> I'm not quite sure how this is happening but at least its more isolated now.
> I'm also wondering if anything else could be affected by this.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]