Abhilash created SPARK-45860:
--------------------------------
Summary: ClassCastException with SerializedLambda in Spark Cluster
Mode
Key: SPARK-45860
URL: https://issues.apache.org/jira/browse/SPARK-45860
Project: Spark
Issue Type: Bug
Components: Spark Core, Spark Submit
Affects Versions: 3.4.1, 3.2.1
Environment: *Environment*
Java Version: 11
Spring Boot Version: 2.7.10
Spark Version: 3.2.1
Reporter: Abhilash
h3. Issue Description
Running a Spark application in cluster mode encounters a
`{*}java.lang.ClassCastException{*}` related to
`j{*}ava.lang.invoke.SerializedLambda{*}`. This issue seems to be specific to
the Spark Cluster mode, and it doesn't occur when running the application
locally without Spring Boot.
h3. Steps to Reproduce
# Create a dummy dataset
{code:java}
Dataset<String> dummyData = spark.createDataset(Arrays.asList("Abhi", "Andrii",
"Rick", "Duc"), Encoders.STRING()); {code}
# Call flatMap function to transform the data
{code:java}
Dataset<TestData> transformedData = dummyData.flatMap(new TestDataFlatMap(),
Encoders.bean(TestData.class)); {code}
# Call any action on the transformed dataset
{code:java}
transformedData.show(); {code}
# Running this Spark application with spark submit command in cluster mode
with Spring Boot results in the mentioned ClassCastException.
h3. *Complete Code:*
{code:java}
@SpringBootApplication(exclude =
{org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
public class SampleSparkJob{
public static void main(String[] args) {
SpringApplication.run(DataIngestionServiceApplication.class, args);
SparkSession spark = SparkSession.builder()
.appName("SampleSparkJob")
.master("local[*]")
.getOrCreate();
Dataset<String> dummyData = spark.createDataset(Arrays.asList("Abhi",
"Andrii", "Rick", "Duc"), Encoders.STRING());
Dataset<TestData> transformedData = dummyData.flatMap(new
TestDataFlatMap(), Encoders.bean(TestData.class));
transformedData.show();
transformedData.write().mode("append").parquet("outputpath");
spark.stop();
}
}{code}
{code:java}
class TestDataFlatMap implements FlatMapFunction<String, TestData>,
Serializable {
@Override
public Iterator<TestData> call(String name) {
return Arrays.asList(new TestData(name)).iterator();
}
}{code}
{code:java}
@Data
@AllArgsConstructor
public class TestData implements Serializable {
private String name;
} {code}
h3.
Stack trace:
{code:java}
WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.248.66.38 executor
0): java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of
org.apache.spark.rdd.MapPartitionsRDD at
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
at
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
at
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
at
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566) at
java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566) at
java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566) at
java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at
org.apache.spark.scheduler.Task.run(Task.scala:131) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
*Environment*
Java Version: 11
Spring Boot Version: 2.7.10
Spark Version: 3.2.1
h3. Additional Information:
The issue seems to be related to Spring Boot auto-configuration or the
dependencies included with Spring Boot.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]