regis le bretonnic created SPARK-54712:
------------------------------------------
Summary: serialization error with spark streaming
Key: SPARK-54712
URL: https://issues.apache.org/jira/browse/SPARK-54712
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 3.5.3
Environment: spark : 2.5.3
scala : 2.12.18
java : 11.0.26
Reporter: regis le bretonnic
Hello
I'm facing an error when doing a very simple test with structured streaming.
This fails :
{quote}{{*val kafkaDF = spark.readStream*}}
{{ *.format("kafka")*}}
{{ *.option("kafka.bootstrap.servers", "mykafka:9092")*}}
{{ *.option("subscribe", "mytopic")*}}
{{ *.option("startingOffsets", "latest")*}}
{{ *.load*}}
{{*val processedDF = kafkaDF.select("topic","key","value")*}}
{{*processedDF.writeStream*}}
{{ *.format("parquet")*}}
{{ *.option("path","/tmp/data/kafka/mytopic")*}}
{{ *.option("checkpointLocation","/tmp/checkpoint/kafka/mytopic")*}}
{{ *.trigger(Trigger.ProcessingTime("1 minute"))*}}
{{ *.outputMode("append")*}}
{{ *.start()*}}
{{ *.awaitTermination()*}}{quote}
{{{*}{*}The error is :}}
{quote}Caused by: java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions
of type scala.collection.Seq in instance of
org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition at
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
at
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
at
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
at
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829){quote}
The same script works fine I replace the streaming from kafka with rate format
{quote}val rateDF = spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.option("rampUpTime", 5)
.option("numPartitions", 2)
.load()
val processedDF = rateDF.select("timestamp","value"){quote}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]