[ 
https://issues.apache.org/jira/browse/SPARK-36803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-36803:
-----------------------------------

    Assignee: Ivan

> ClassCastException: optional int32 col-0 is not a group when reading legacy 
> Parquet files 
> ------------------------------------------------------------------------------------------
>
>                 Key: SPARK-36803
>                 URL: https://issues.apache.org/jira/browse/SPARK-36803
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2
>            Reporter: Ivan
>            Assignee: Ivan
>            Priority: Major
>             Fix For: 3.2.0, 3.1.3, 3.0.4
>
>
> When reading Parquet files that have been written in legacy mode and schema 
> evolution, we observed that 2-level LIST annotated types are traversed 
> incorrectly. 
> The root cause is the imprecise check on the underlying element type for 
> Array types (and potentially Map types but I have not checked those yet) that 
> happens here: 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala#L606]
> The issue is only reproducible with schema evolution with parquet-mr reader 
> and when there are two schemas like this:
> File 1:
> {code:java}
> root
>  |-- col-0: array (nullable = true)
>  |    |-- element: struct (containsNull = false)
>  |    |    |-- col-0: integer (nullable = true)
> {code}
> File 2:
> {code:java}
> root
>  |-- col-0: array (nullable = true)
>  |    |-- element: struct (containsNull = false)
>  |    |    |-- col-0: integer (nullable = true)
>  |    |    |-- col-1: integer (nullable = true){code}
>  
> When ParquetRowConverter tries to unwrap ArrayType, it checks if the 
> underlying types between Parquet and Spark match. However, in the case above 
> since the actual schema would include both fields, resulting in mismatch and 
> failure to read File 1:
> {noformat}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
> stage 11.0 failed 1 times, most recent failure: Lost task 1.0 in stage 11.0 
> (TID 18) (ip-1-2-3-4.us-west-2.compute.internal executor driver): 
> java.lang.ClassCastException: optional int32 col-0 is not a group
> at org.apache.parquet.schema.Type.asGroupType(Type.java:248)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:424)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter$ElementConverter.<init>(ParquetRowConverter.scala:633)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.<init>(ParquetRowConverter.scala:616)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:390)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.$anonfun$fieldConverters$1(ParquetRowConverter.scala:214)
> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at scala.collection.TraversableLike.map(TraversableLike.scala:286)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.<init>(ParquetRowConverter.scala:210){noformat}
> This happens due to L606 in ParquetRowConverter: 
> {code:java}
> DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)
> {code}
> The code assumes that we are working with 3 level lists and would incorrectly 
> remove the “dummy” level from the Parquet schema.
> The actual error varies depending on column names - in this case struct type 
> name matches primitive type name so we end up with "optional int32 col-0 is 
> not a group". In other case, it could fail with IndexOutOfBoundException or 
> NoSuchElementException when the column name is not found in the struct.
> The reason it works with 3-level list, that 
> DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) 
> always evaluates to false, we remove the “dummy” level and perform struct 
> match which takes into account schema evolution.  
>  
> Repro:
> {code:java}
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> val schema1 = StructType(
>   StructField("col-0", ArrayType(
>     StructType(
>       StructField("col-0", IntegerType, true) :: Nil
>     ), 
>     containsNull = false
>   )) :: Nil
> )
> val rdd1 = sc.parallelize(Row(Array(Row(1))) :: Nil, 1)
> val df1 = spark.createDataFrame(rdd1, schema1)
> df1.write.parquet("/tmp/legacy-parquet")
> val schema2 = StructType(
>   StructField("col-0", ArrayType(
>     StructType(
>       StructField("col-0", IntegerType, true) :: StructField("col-1", 
> IntegerType, true) :: Nil
>     ), 
>     containsNull = false
>   )) :: Nil
> )
> val rdd2 = sc.parallelize(Row(Array(Row(1, 2))) :: Nil, 1)
> val df2 = spark.createDataFrame(rdd2, schema2)
> df2.write.mode("append").parquet("/tmp/legacy-parquet")
> // Fails with: Caused by: ClassCastException: optional int32 col-0 is not a 
> group
> display(spark.read.schema(schema2).parquet("/tmp/legacy-parquet"))
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to