[
https://issues.apache.org/jira/browse/SPARK-54712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18045778#comment-18045778
]
Jungtaek Lim commented on SPARK-54712:
--------------------------------------
Hi, thanks for reporting. I'm lowering down the severity since critical and
blocker are reserved to committers and committer will bump the severity based
on the justification.
This tends to happen when Scala version has mismatched between the app you
packaged and Spark. You've mentioned Scala version to 2.12.18 which is actually
the expected version. Could you please recheck your app to see whether you have
pulled the Scala artifact from elsewhere than Spark? Or could you recheck your
app to verify you are not pulling multiple versions of Spark artifacts as
dependencies into your app?
> serialization error with spark streaming
> ----------------------------------------
>
> Key: SPARK-54712
> URL: https://issues.apache.org/jira/browse/SPARK-54712
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.5.3
> Environment: spark : 3.5.3
> scala : 2.12.18
> java : 11.0.26
> Reporter: regis le bretonnic
> Priority: Major
>
> Hello
> I'm facing an error when doing a very simple test with structured streaming.
> This fails :
> {quote}{{val kafkaDF = spark.readStream}}
> .format("kafka")
> .option("kafka.bootstrap.servers", "mykafka:9092")
> .option("subscribe", "mytopic")
> .option("startingOffsets", "latest")
> .load()
> {{val processedDF = kafkaDF.select("topic","key","value")}}
> {{processedDF.writeStream}}
> .format("parquet")
> .option("path","/tmp/data/kafka/mytopic")
> .option("checkpointLocation","/tmp/checkpoint/kafka/mytopic")
> .trigger(Trigger.ProcessingTime("1 minute"))
> .outputMode("append")
> .start()
> .awaitTermination()
> {quote}
> {{{*}{{*}}The error is :}}
> {quote}Caused by: java.lang.ClassCastException: cannot assign instance of
> scala.collection.immutable.List$SerializationProxy to field
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions
> of type scala.collection.Seq in instance of
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition at
> java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
> at
> java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
> at
> java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
> at
> java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
> at
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
> at
> java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
> at
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
> at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
> at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579) at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> {quote}
> The same script works fine if I replace the streaming from kafka with rate
> format
> {quote}val rateDF = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .option("rampUpTime", 5)
> .option("numPartitions", 2)
> .load()
> val processedDF = rateDF.select("timestamp","value")
> {quote}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]