[ https://issues.apache.org/jira/browse/SPARK-38789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Julian Keppel updated SPARK-38789: ---------------------------------- Description: I use Structured Streaming to read Avro records from a Kafka topic A, do some transformations and write as Avro to another Kafka topic B. I use [those functions|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#to_avro-and-from_avro] for serializing and deserializing the Avro records. I faced another exception (parsing error with method "FAILFAST") originally while reading from the second topic B with the specified schema in a separate job. I wondered how this could happen because the schemas were exactly the same. To discover more details, I came up with the following experimental setup to reproduce the exception. I found out that the serialized Avro records (with the exact same schema, just some easy transformations) can't be deserialized again: {code:java} from pyspark.sql.types import StringType, Row, StructField, StructType, LongType, StringType, ArrayType from pyspark.sql import functions as func from pyspark.sql.avro import functions as afunc avro_schema = """ { "fields": [ { "name": "field_1", "type": [ "string", "null" ] }, { "name": "field_2", "type": [ "long", "null" ] }, { "name": "field_3", "type": [ { "items": [ "string", "null" ], "type": "array" }, "null" ] } ], "name": "bla", "namespace": "bla", "type": "record" } """ # Create example DF df = spark.createDataFrame( [Row("bla", 42, ["bla", "blubb"])], StructType( [ StructField("field_1", StringType(), True), StructField("field_2", LongType(), True), StructField("field_3", ArrayType(StringType()), True), ] ), ) # Serialize columns to Avro record df_avro = df.select(afunc.to_avro(func.struct(df.columns), avro_schema)) # Now, change values in DF to some arbitrary value df_changed_1 = df.withColumn("field_2", func.lit(998877665544332211)) df_changed_2 = df.withColumn("field_2", func.lit(None)) # To enforce nullability in schema for field_2 df_changed = df_changed_1.unionByName(df_changed_2) # Again, serialize columns to Avro record with same schema as before df_changed_avro = df_changed.select(afunc.to_avro(func.struct(df_changed.columns), avro_schema)) # Schemas are exactly identical, which makes sense because nothing changed in the schema itself, only the values df.schema df_changed.schema # Have a look at the DFs values df.show() df_changed.show() # Enforce DF evaulation df_avro.show() # This works as expected df_changed_avro.show() # This produced the exception{code} The last line produces the following exception: {code:java} org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 35.0 failed 1 times, most recent failure: Lost task 2.0 in stage 35.0 (TID 109, 192.168.2.117, executor driver): org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type LongType to Avro type ["long","null"]. at org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at org.apache.spark.sql.types.StructType.map(StructType.scala:102) at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232) at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72) at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55) at org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42) at org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41) at org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.head(Dataset.scala:2697) at org.apache.spark.sql.Dataset.take(Dataset.scala:2904) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300) at org.apache.spark.sql.Dataset.showString(Dataset.scala:337) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type LongType to Avro type ["long","null"]. at org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at org.apache.spark.sql.types.StructType.map(StructType.scala:102) at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232) at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72) at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55) at org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42) at org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41) at org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more {code} According to the documentation, Spark SQL types can not be converted to Avro union types (at least it's not explicitly [written down here|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion]), but I wonder why it seem to work for the original Dataframe. Am I missing something or is this a Bug? This also happens when I for example add a new field to the original Dataframe (df) and try to serialize it with an enhanced schema with the new field added. was: I use Structured Streaming to read Avro records from a Kafka topic A, do some transformations and write as Avro to another Kafka topic B. I use [those functions|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#to_avro-and-from_avro] for serializing and deserializing the Avro records. I faced another exception (parsing error with method "FAILFAST") originally while reading from the second topic B with the specified schema in a separate job. I wondered how this could happen because the schema are exactly the same. To discover more details, I came up with the following experimental setup to reproduce the exception. I found out that the serialized Avro records (with the exact same schema, just some easy transformations) can't be deserialized again: {code:java} from pyspark.sql.types import StringType, Row, StructField, StructType, LongType, StringType, ArrayType from pyspark.sql import functions as func from pyspark.sql.avro import functions as afunc avro_schema = """ { "fields": [ { "name": "field_1", "type": [ "string", "null" ] }, { "name": "field_2", "type": [ "long", "null" ] }, { "name": "field_3", "type": [ { "items": [ "string", "null" ], "type": "array" }, "null" ] } ], "name": "bla", "namespace": "bla", "type": "record" } """ # Create example DF df = spark.createDataFrame( [Row("bla", 42, ["bla", "blubb"])], StructType( [ StructField("field_1", StringType(), True), StructField("field_2", LongType(), True), StructField("field_3", ArrayType(StringType()), True), ] ), ) # Serialize columns to Avro record df_avro = df.select(afunc.to_avro(func.struct(df.columns), avro_schema)) # Now, change values in DF to some arbitrary value df_changed_1 = df.withColumn("field_2", func.lit(998877665544332211)) df_changed_2 = df.withColumn("field_2", func.lit(None)) # To enforce nullability in schema for field_2 df_changed = df_changed_1.unionByName(df_changed_2) # Again, serialize columns to Avro record with same schema as before df_changed_avro = df_changed.select(afunc.to_avro(func.struct(df_changed.columns), avro_schema)) # Schemas are exactly identical, which makes sense because nothing changed in the schema itself, only the values df.schema df_changed.schema # Have a look at the DFs values df.show() df_changed.show() # Enforce DF evaulation df_avro.show() # This works as expected df_changed_avro.show() # This produced the exception{code} The last line produces the following exception: {code:java} org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 35.0 failed 1 times, most recent failure: Lost task 2.0 in stage 35.0 (TID 109, 192.168.2.117, executor driver): org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type LongType to Avro type ["long","null"]. at org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at org.apache.spark.sql.types.StructType.map(StructType.scala:102) at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232) at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72) at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55) at org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42) at org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41) at org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.head(Dataset.scala:2697) at org.apache.spark.sql.Dataset.take(Dataset.scala:2904) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300) at org.apache.spark.sql.Dataset.showString(Dataset.scala:337) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type LongType to Avro type ["long","null"]. at org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at org.apache.spark.sql.types.StructType.map(StructType.scala:102) at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232) at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72) at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55) at org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42) at org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41) at org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more {code} According to the documentation, Spark SQL types can not be converted to Avro union types (at least it's not explicitly [written down here|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion]), but I wonder why it seem to work for the original Dataframe. Am I missing something or is this a Bug? This also happens when I for example add a new field to the original Dataframe (df) and try to serialize it with an enhanced schema with the new field added. > Spark-Avro throws exception at to_avro after transformation although schema > didn't change > ----------------------------------------------------------------------------------------- > > Key: SPARK-38789 > URL: https://issues.apache.org/jira/browse/SPARK-38789 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.0.3 > Environment: PySpark Version 3.0.3 > Spark-Avro-Version 3.0.3 > Python Version 3.6.8 > Scala Version 2.12 > Reporter: Julian Keppel > Priority: Major > > I use Structured Streaming to read Avro records from a Kafka topic A, do some > transformations and write as Avro to another Kafka topic B. I use [those > functions|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#to_avro-and-from_avro] > for serializing and deserializing the Avro records. > I faced another exception (parsing error with method "FAILFAST") originally > while reading from the second topic B with the specified schema in a separate > job. I wondered how this could happen because the schemas were exactly the > same. > To discover more details, I came up with the following experimental setup to > reproduce the exception. I found out that the serialized Avro records (with > the exact same schema, just some easy transformations) can't be deserialized > again: > > {code:java} > from pyspark.sql.types import StringType, Row, StructField, StructType, > LongType, StringType, ArrayType > from pyspark.sql import functions as func > from pyspark.sql.avro import functions as afunc > avro_schema = """ > { > "fields": [ > { > "name": "field_1", > "type": [ > "string", > "null" > ] > }, > { > "name": "field_2", > "type": [ > "long", > "null" > ] > }, > { > "name": "field_3", > "type": [ > { > "items": [ > "string", > "null" > ], > "type": "array" > }, > "null" > ] > } > ], > "name": "bla", > "namespace": "bla", > "type": "record" > } > """ > # Create example DF > df = spark.createDataFrame( > [Row("bla", 42, ["bla", "blubb"])], > StructType( > [ > StructField("field_1", StringType(), True), > StructField("field_2", LongType(), True), > StructField("field_3", ArrayType(StringType()), True), > ] > ), > ) > # Serialize columns to Avro record > df_avro = df.select(afunc.to_avro(func.struct(df.columns), avro_schema)) > # Now, change values in DF to some arbitrary value > df_changed_1 = df.withColumn("field_2", func.lit(998877665544332211)) > df_changed_2 = df.withColumn("field_2", func.lit(None)) # To enforce > nullability in schema for field_2 > df_changed = df_changed_1.unionByName(df_changed_2) > # Again, serialize columns to Avro record with same schema as before > df_changed_avro = > df_changed.select(afunc.to_avro(func.struct(df_changed.columns), avro_schema)) > # Schemas are exactly identical, which makes sense because nothing changed in > the schema itself, only the values > df.schema > df_changed.schema > # Have a look at the DFs values > df.show() > df_changed.show() > # Enforce DF evaulation > df_avro.show() # This works as expected > df_changed_avro.show() # This produced the exception{code} > > The last line produces the following exception: > {code:java} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in > stage 35.0 failed 1 times, most recent failure: Lost task 2.0 in stage 35.0 > (TID 109, 192.168.2.117, executor driver): > org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert > Catalyst type LongType to Avro type ["long","null"]. > at > org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219) > at > org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at org.apache.spark.sql.types.StructType.map(StructType.scala:102) > at > org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232) > at > org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72) > at > org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55) > at > org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42) > at > org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41) > at > org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:127) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829)Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973) > at scala.Option.foreach(Option.scala:407) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47) > at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627) > at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697) > at > org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) > at > org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) > at org.apache.spark.sql.Dataset.head(Dataset.scala:2697) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2904) > at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:337) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot > convert Catalyst type LongType to Avro type ["long","null"]. > at > org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219) > at > org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at org.apache.spark.sql.types.StructType.map(StructType.scala:102) > at > org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232) > at > org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72) > at > org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55) > at > org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42) > at > org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41) > at > org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:127) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ... 1 more {code} > According to the documentation, Spark SQL types can not be converted to Avro > union types (at least it's not explicitly [written down > here|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion]), > but I wonder why it seem to work for the original Dataframe. > Am I missing something or is this a Bug? This also happens when I for example > add a new field to the original Dataframe (df) and try to serialize it with > an enhanced schema with the new field added. > -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org