[ 
https://issues.apache.org/jira/browse/SPARK-38789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julian Keppel updated SPARK-38789:
----------------------------------
    Description: 
I use Structured Streaming to read Avro records from a Kafka topic A, do some 
transformations and write as Avro to another Kafka topic B. I use [those 
functions|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#to_avro-and-from_avro]
 for serializing and deserializing the Avro records.

I faced another exception (parsing error with method "FAILFAST") originally 
while reading from the second topic B with the specified schema in a separate 
job. I wondered how this could happen because the schemas were exactly the same.

To discover more details, I came up with the following experimental setup to 
reproduce the exception. I found out that the serialized Avro records (with the 
exact same schema, just some easy transformations) can't be deserialized again:

 
{code:java}
from pyspark.sql.types import StringType, Row, StructField, StructType, 
LongType, StringType, ArrayType

from pyspark.sql import functions as func
from pyspark.sql.avro import functions as afunc

avro_schema = """
{
    "fields": [
        {
            "name": "field_1",
            "type": [
                "string",
                "null"
            ]
        },
        {
            "name": "field_2",
            "type": [
                "long",
                "null"
            ]
        },
        {
            "name": "field_3",
            "type": [
                {
                    "items": [
                        "string",
                        "null"
                    ],
                    "type": "array"
                },
                "null"
            ]
        }
    ],
    "name": "bla",
    "namespace": "bla",
    "type": "record"
}
"""

# Create example DF
df = spark.createDataFrame(
    [Row("bla", 42, ["bla", "blubb"])],
    StructType(
        [
            StructField("field_1", StringType(), True),
            StructField("field_2", LongType(), True),
            StructField("field_3", ArrayType(StringType()), True),
        ]
    ),
)

# Serialize columns to Avro record
df_avro = df.select(afunc.to_avro(func.struct(df.columns), avro_schema))

# Now, change values in DF to some arbitrary value
df_changed_1 = df.withColumn("field_2", func.lit(998877665544332211))
df_changed_2 = df.withColumn("field_2", func.lit(None)) # To enforce 
nullability in schema for field_2
df_changed = df_changed_1.unionByName(df_changed_2)

# Again, serialize columns to Avro record with same schema as before
df_changed_avro = 
df_changed.select(afunc.to_avro(func.struct(df_changed.columns), avro_schema))

# Schemas are exactly identical, which makes sense because nothing changed in 
the schema itself, only the values
df.schema
df_changed.schema

# Have a look at the DFs values
df.show()
df_changed.show()

# Enforce DF evaulation
df_avro.show() # This works as expected
df_changed_avro.show() # This produced the exception{code}
 

The last line produces the following exception: 
{code:java}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in 
stage 35.0 failed 1 times, most recent failure: Lost task 2.0 in stage 35.0 
(TID 109, 192.168.2.117, executor driver): 
org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst 
type LongType to Avro type ["long","null"].
        at 
org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219)
        at 
org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
        at 
org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232)
        at 
org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72)
        at 
org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
        at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot 
convert Catalyst type LongType to Avro type ["long","null"].
        at 
org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219)
        at 
org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
        at 
org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232)
        at 
org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72)
        at 
org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more {code}
According to the documentation, Spark SQL types can not be converted to Avro 
union types (at least it's not explicitly [written down 
here|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion]),
 but I wonder why it seem to work for the original Dataframe.

Am I missing something or is this a Bug? This also happens when I for example 
add a new field to the original Dataframe (df) and try to serialize it with an 
enhanced schema with the new field added. 

 

  was:
I use Structured Streaming to read Avro records from a Kafka topic A, do some 
transformations and write as Avro to another Kafka topic B. I use [those 
functions|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#to_avro-and-from_avro]
 for serializing and deserializing the Avro records.

I faced another exception (parsing error with method "FAILFAST") originally 
while reading from the second topic B with the specified schema in a separate 
job. I wondered how this could happen because the schema are exactly the same.

To discover more details, I came up with the following experimental setup to 
reproduce the exception. I found out that the serialized Avro records (with the 
exact same schema, just some easy transformations) can't be deserialized again:

 
{code:java}
from pyspark.sql.types import StringType, Row, StructField, StructType, 
LongType, StringType, ArrayType

from pyspark.sql import functions as func
from pyspark.sql.avro import functions as afunc

avro_schema = """
{
    "fields": [
        {
            "name": "field_1",
            "type": [
                "string",
                "null"
            ]
        },
        {
            "name": "field_2",
            "type": [
                "long",
                "null"
            ]
        },
        {
            "name": "field_3",
            "type": [
                {
                    "items": [
                        "string",
                        "null"
                    ],
                    "type": "array"
                },
                "null"
            ]
        }
    ],
    "name": "bla",
    "namespace": "bla",
    "type": "record"
}
"""

# Create example DF
df = spark.createDataFrame(
    [Row("bla", 42, ["bla", "blubb"])],
    StructType(
        [
            StructField("field_1", StringType(), True),
            StructField("field_2", LongType(), True),
            StructField("field_3", ArrayType(StringType()), True),
        ]
    ),
)

# Serialize columns to Avro record
df_avro = df.select(afunc.to_avro(func.struct(df.columns), avro_schema))

# Now, change values in DF to some arbitrary value
df_changed_1 = df.withColumn("field_2", func.lit(998877665544332211))
df_changed_2 = df.withColumn("field_2", func.lit(None)) # To enforce 
nullability in schema for field_2
df_changed = df_changed_1.unionByName(df_changed_2)

# Again, serialize columns to Avro record with same schema as before
df_changed_avro = 
df_changed.select(afunc.to_avro(func.struct(df_changed.columns), avro_schema))

# Schemas are exactly identical, which makes sense because nothing changed in 
the schema itself, only the values
df.schema
df_changed.schema

# Have a look at the DFs values
df.show()
df_changed.show()

# Enforce DF evaulation
df_avro.show() # This works as expected
df_changed_avro.show() # This produced the exception{code}
 

The last line produces the following exception: 
{code:java}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in 
stage 35.0 failed 1 times, most recent failure: Lost task 2.0 in stage 35.0 
(TID 109, 192.168.2.117, executor driver): 
org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst 
type LongType to Avro type ["long","null"].
        at 
org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219)
        at 
org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
        at 
org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232)
        at 
org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72)
        at 
org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
        at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot 
convert Catalyst type LongType to Avro type ["long","null"].
        at 
org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219)
        at 
org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
        at 
org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232)
        at 
org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72)
        at 
org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
        at 
org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more {code}
According to the documentation, Spark SQL types can not be converted to Avro 
union types (at least it's not explicitly [written down 
here|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion]),
 but I wonder why it seem to work for the original Dataframe.

Am I missing something or is this a Bug? This also happens when I for example 
add a new field to the original Dataframe (df) and try to serialize it with an 
enhanced schema with the new field added. 

 


> Spark-Avro throws exception at to_avro after transformation although schema 
> didn't change
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-38789
>                 URL: https://issues.apache.org/jira/browse/SPARK-38789
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.3
>         Environment: PySpark Version 3.0.3
> Spark-Avro-Version 3.0.3
> Python Version 3.6.8
> Scala Version 2.12
>            Reporter: Julian Keppel
>            Priority: Major
>
> I use Structured Streaming to read Avro records from a Kafka topic A, do some 
> transformations and write as Avro to another Kafka topic B. I use [those 
> functions|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#to_avro-and-from_avro]
>  for serializing and deserializing the Avro records.
> I faced another exception (parsing error with method "FAILFAST") originally 
> while reading from the second topic B with the specified schema in a separate 
> job. I wondered how this could happen because the schemas were exactly the 
> same.
> To discover more details, I came up with the following experimental setup to 
> reproduce the exception. I found out that the serialized Avro records (with 
> the exact same schema, just some easy transformations) can't be deserialized 
> again:
>  
> {code:java}
> from pyspark.sql.types import StringType, Row, StructField, StructType, 
> LongType, StringType, ArrayType
> from pyspark.sql import functions as func
> from pyspark.sql.avro import functions as afunc
> avro_schema = """
> {
>     "fields": [
>         {
>             "name": "field_1",
>             "type": [
>                 "string",
>                 "null"
>             ]
>         },
>         {
>             "name": "field_2",
>             "type": [
>                 "long",
>                 "null"
>             ]
>         },
>         {
>             "name": "field_3",
>             "type": [
>                 {
>                     "items": [
>                         "string",
>                         "null"
>                     ],
>                     "type": "array"
>                 },
>                 "null"
>             ]
>         }
>     ],
>     "name": "bla",
>     "namespace": "bla",
>     "type": "record"
> }
> """
> # Create example DF
> df = spark.createDataFrame(
>     [Row("bla", 42, ["bla", "blubb"])],
>     StructType(
>         [
>             StructField("field_1", StringType(), True),
>             StructField("field_2", LongType(), True),
>             StructField("field_3", ArrayType(StringType()), True),
>         ]
>     ),
> )
> # Serialize columns to Avro record
> df_avro = df.select(afunc.to_avro(func.struct(df.columns), avro_schema))
> # Now, change values in DF to some arbitrary value
> df_changed_1 = df.withColumn("field_2", func.lit(998877665544332211))
> df_changed_2 = df.withColumn("field_2", func.lit(None)) # To enforce 
> nullability in schema for field_2
> df_changed = df_changed_1.unionByName(df_changed_2)
> # Again, serialize columns to Avro record with same schema as before
> df_changed_avro = 
> df_changed.select(afunc.to_avro(func.struct(df_changed.columns), avro_schema))
> # Schemas are exactly identical, which makes sense because nothing changed in 
> the schema itself, only the values
> df.schema
> df_changed.schema
> # Have a look at the DFs values
> df.show()
> df_changed.show()
> # Enforce DF evaulation
> df_avro.show() # This works as expected
> df_changed_avro.show() # This produced the exception{code}
>  
> The last line produces the following exception: 
> {code:java}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in 
> stage 35.0 failed 1 times, most recent failure: Lost task 2.0 in stage 35.0 
> (TID 109, 192.168.2.117, executor driver): 
> org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert 
> Catalyst type LongType to Avro type ["long","null"].
>         at 
> org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219)
>         at 
> org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
>         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>         at scala.collection.Iterator.foreach(Iterator.scala:941)
>         at scala.collection.Iterator.foreach$(Iterator.scala:941)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>         at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
>         at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>         at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>         at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
>         at 
> org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232)
>         at 
> org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72)
>         at 
> org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
>         at 
> org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
>         at 
> org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
>         at 
> org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>         at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
>         at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
>         at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:127)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:829)Driver stacktrace:
>         at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
>         at scala.Option.foreach(Option.scala:407)
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>         at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
>         at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
>         at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
>         at 
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
>         at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
>         at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
>         at 
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
>         at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
>         at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
>         at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
>         at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>         at py4j.Gateway.invoke(Gateway.java:282)
>         at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot 
> convert Catalyst type LongType to Avro type ["long","null"].
>         at 
> org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219)
>         at 
> org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
>         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>         at scala.collection.Iterator.foreach(Iterator.scala:941)
>         at scala.collection.Iterator.foreach$(Iterator.scala:941)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>         at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
>         at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>         at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>         at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
>         at 
> org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232)
>         at 
> org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72)
>         at 
> org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
>         at 
> org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
>         at 
> org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
>         at 
> org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>         at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
>         at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
>         at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:127)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         ... 1 more {code}
> According to the documentation, Spark SQL types can not be converted to Avro 
> union types (at least it's not explicitly [written down 
> here|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion]),
>  but I wonder why it seem to work for the original Dataframe.
> Am I missing something or is this a Bug? This also happens when I for example 
> add a new field to the original Dataframe (df) and try to serialize it with 
> an enhanced schema with the new field added. 
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to