[
https://issues.apache.org/jira/browse/SPARK-54712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
regis le bretonnic updated SPARK-54712:
---------------------------------------
Description:
Hello
I'm facing an error when doing a very simple test with structured streaming.
This fails :
{quote}{{val kafkaDF = spark.readStream}}
.format("kafka")
.option("kafka.bootstrap.servers", "mykafka:9092")
.option("subscribe", "mytopic")
.option("startingOffsets", "latest")
.load}}
{{val processedDF = kafkaDF.select("topic","key","value")}}
{{processedDF.writeStream}}
.format("parquet")
.option("path","/tmp/data/kafka/mytopic")
.option("checkpointLocation","/tmp/checkpoint/kafka/mytopic")
.trigger(Trigger.ProcessingTime("1 minute"))
.outputMode("append")
.start()
.awaitTermination()
{quote}
{{{*}{{*}}The error is :}}
{quote}Caused by: java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions
of type scala.collection.Seq in instance of
org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition at
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
at
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
at
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
at
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
{quote}
The same script works fine I replace the streaming from kafka with rate format
{quote}val rateDF = spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.option("rampUpTime", 5)
.option("numPartitions", 2)
.load()
val processedDF = rateDF.select("timestamp","value")
{quote}
was:
Hello
I'm facing an error when doing a very simple test with structured streaming.
This fails :
{quote}{{*val kafkaDF = spark.readStream*}}
{{ *.format("kafka")*}}
{{ *.option("kafka.bootstrap.servers", "mykafka:9092")*}}
{{ *.option("subscribe", "mytopic")*}}
{{ *.option("startingOffsets", "latest")*}}
{{ *.load*}}
{{*val processedDF = kafkaDF.select("topic","key","value")*}}
{{*processedDF.writeStream*}}
{{ *.format("parquet")*}}
{{ *.option("path","/tmp/data/kafka/mytopic")*}}
{{ *.option("checkpointLocation","/tmp/checkpoint/kafka/mytopic")*}}
{{ *.trigger(Trigger.ProcessingTime("1 minute"))*}}
{{ *.outputMode("append")*}}
{{ *.start()*}}
{{ *.awaitTermination()*}}{quote}
{{{*}{*}The error is :}}
{quote}Caused by: java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions
of type scala.collection.Seq in instance of
org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition at
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
at
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
at
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
at
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829){quote}
The same script works fine I replace the streaming from kafka with rate format
{quote}val rateDF = spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.option("rampUpTime", 5)
.option("numPartitions", 2)
.load()
val processedDF = rateDF.select("timestamp","value"){quote}
> serialization error with spark streaming
> ----------------------------------------
>
> Key: SPARK-54712
> URL: https://issues.apache.org/jira/browse/SPARK-54712
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.5.3
> Environment: spark : 2.5.3
> scala : 2.12.18
> java : 11.0.26
> Reporter: regis le bretonnic
> Priority: Blocker
>
> Hello
> I'm facing an error when doing a very simple test with structured streaming.
> This fails :
> {quote}{{val kafkaDF = spark.readStream}}
> .format("kafka")
> .option("kafka.bootstrap.servers", "mykafka:9092")
> .option("subscribe", "mytopic")
> .option("startingOffsets", "latest")
> .load}}
> {{val processedDF = kafkaDF.select("topic","key","value")}}
> {{processedDF.writeStream}}
> .format("parquet")
> .option("path","/tmp/data/kafka/mytopic")
> .option("checkpointLocation","/tmp/checkpoint/kafka/mytopic")
> .trigger(Trigger.ProcessingTime("1 minute"))
> .outputMode("append")
> .start()
> .awaitTermination()
> {quote}
> {{{*}{{*}}The error is :}}
> {quote}Caused by: java.lang.ClassCastException: cannot assign instance of
> scala.collection.immutable.List$SerializationProxy to field
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions
> of type scala.collection.Seq in instance of
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition at
> java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
> at
> java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
> at
> java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
> at
> java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
> at
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
> at
> java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
> at
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
> at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
> at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579) at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> {quote}
> The same script works fine I replace the streaming from kafka with rate format
> {quote}val rateDF = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .option("rampUpTime", 5)
> .option("numPartitions", 2)
> .load()
> val processedDF = rateDF.select("timestamp","value")
> {quote}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]