[ 
https://issues.apache.org/jira/browse/SPARK-54712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18045778#comment-18045778
 ] 

Jungtaek Lim commented on SPARK-54712:
--------------------------------------

Hi, thanks for reporting. I'm lowering down the severity since critical and 
blocker are reserved to committers and committer will bump the severity based 
on the justification.

This tends to happen when Scala version has mismatched between the app you 
packaged and Spark. You've mentioned Scala version to 2.12.18 which is actually 
the expected version. Could you please recheck your app to see whether you have 
pulled the Scala artifact from elsewhere than Spark? Or could you recheck your 
app to verify you are not pulling multiple versions of Spark artifacts as 
dependencies into your app?

> serialization error with spark streaming
> ----------------------------------------
>
>                 Key: SPARK-54712
>                 URL: https://issues.apache.org/jira/browse/SPARK-54712
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.5.3
>         Environment: spark : 3.5.3
> scala : 2.12.18
> java : 11.0.26
>            Reporter: regis le bretonnic
>            Priority: Major
>
> Hello
> I'm facing an error when doing a very simple test with structured streaming.
> This fails :
> {quote}{{val kafkaDF = spark.readStream}}
> .format("kafka")
> .option("kafka.bootstrap.servers", "mykafka:9092")
> .option("subscribe", "mytopic")
> .option("startingOffsets", "latest")
> .load()
> {{val processedDF = kafkaDF.select("topic","key","value")}}
> {{processedDF.writeStream}}
> .format("parquet")
> .option("path","/tmp/data/kafka/mytopic")
> .option("checkpointLocation","/tmp/checkpoint/kafka/mytopic")
> .trigger(Trigger.ProcessingTime("1 minute"))
> .outputMode("append")
> .start()
> .awaitTermination()
> {quote}
> {{{*}{{*}}The error is :}}
> {quote}Caused by: java.lang.ClassCastException: cannot assign instance of 
> scala.collection.immutable.List$SerializationProxy to field 
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions
>  of type scala.collection.Seq in instance of 
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition at 
> java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
>  at 
> java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
>  at 
> java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
>  at 
> java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
>  at 
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
>  at 
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
>  at 
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) 
> at 
> java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
>  at 
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
>  at 
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
>  at 
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) 
> at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) 
> at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) 
> at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
>  at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579) at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:829)
> {quote}
> The same script works fine if I replace the streaming from kafka with rate 
> format
> {quote}val rateDF = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .option("rampUpTime", 5)
> .option("numPartitions", 2)
> .load()
> val processedDF = rateDF.select("timestamp","value")
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to