regis le bretonnic created SPARK-54712:
------------------------------------------

             Summary: serialization error with spark streaming
                 Key: SPARK-54712
                 URL: https://issues.apache.org/jira/browse/SPARK-54712
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.5.3
         Environment: spark : 2.5.3
scala : 2.12.18
java : 11.0.26
            Reporter: regis le bretonnic


Hello

I'm facing an error when doing a very simple test with structured streaming.
This fails :


{quote}{{*val kafkaDF = spark.readStream*}}
{{ *.format("kafka")*}}
{{ *.option("kafka.bootstrap.servers", "mykafka:9092")*}}
{{ *.option("subscribe", "mytopic")*}}
{{ *.option("startingOffsets", "latest")*}}
{{ *.load*}}

{{*val processedDF = kafkaDF.select("topic","key","value")*}}

{{*processedDF.writeStream*}}
{{ *.format("parquet")*}}
{{ *.option("path","/tmp/data/kafka/mytopic")*}}
{{ *.option("checkpointLocation","/tmp/checkpoint/kafka/mytopic")*}}
{{ *.trigger(Trigger.ProcessingTime("1 minute"))*}}
{{ *.outputMode("append")*}}
{{ *.start()*}}
{{ *.awaitTermination()*}}{quote}

{{{*}{*}The error is :}}
{quote}Caused by: java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions
 of type scala.collection.Seq in instance of 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition at 
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
 at 
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
 at 
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
 at 
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
 at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419) 
at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
 at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
 at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) 
at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
 at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
 at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579) at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:829){quote}
The same script works fine I replace the streaming from kafka with rate format


{quote}val rateDF = spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.option("rampUpTime", 5)
.option("numPartitions", 2)
.load()

val processedDF = rateDF.select("timestamp","value"){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to