[ 
https://issues.apache.org/jira/browse/SPARK-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365062#comment-15365062
 ] 

Ryan Blue commented on SPARK-16344:
-----------------------------------

It looks like the main change is to specifically catch the 3-level name 
structure, {{list-name (LIST) -> "list" -> "element"}}. The problem with this 
approach is that it doesn't solve the problem entirely either.

Let me try to give a bit more background. In parquet-avro, there are two 
{{isElementType}} methods; one in the schema converter and one in the record 
converter. The one in the schema converter will guess whether the Parquet type 
uses a 3-level list or a 2-level list when it can't be determined according to 
the spec's backward-compatibility rules. That guess assumes a 2-level structure 
by default and at the next major release will guess a 3-level structure. (This 
can be controlled by a property.) But this is only used when the reader doesn't 
supply a read schema / expected schema and the code has to convert from 
Parquet's type to get one. Ideally, we always have a read schema from the file, 
from the reader's expected class (if using Java objects), or from the reader 
passing in the expected schema. That's why the other {{isElementType}} method 
exists: it looks at the expected schema and the file schema to determine 
whether the caller has passed in a schema with the extra single-field 
list/element struct.

That code has to distinguish between two cases for a 3-level list:
1. When the caller expects {{List<OneTuple<ElementType>>}}, with the extra 
record layer that was originally returned when Avro only knew about 2-level 
lists.
2. When the caller expects {{List<ElementType>}}, without an extra layer.

The code currently assumes that if the element schema appears to match the 
repeated type that the caller has passed a schema indicating case 1. This issue 
points out that the matching isn't perfect and an element with a single field 
named "element" will incorrectly match case 1 when it was really case 2. The 
problem with the solution in PR #14013, if it were applied to Avro, is that it 
breaks if the caller is actually passing a schema for case 1.

I'm not sure whether Spark works like Avro and has two {{isElementType}} 
methods. If Spark can guarantee that the table schema is never case 1, then it 
is correct to use the logic in the PR. I don't think that's always the case 
because the table schema may come from user objects in a Dataset or from the 
Hive MetaStore. But, this may be a reasonable heuristic if you think case 2 is 
far more common than case 1. For parquet-avro, I think the user supplying a 
single-field record with the inner field named "element" is rare enough that it 
doesn't really matter, but it's up to you guys in the Spark community on this 
issue.

One last thing: based on the rest of the schema structure, there should be only 
one way to match the expected schema to the file schema. You could always try 
both and fall back to the other case, or have a more complicated 
{{isElementType}} method that recurses down the sub-trees to find a match. I 
didn't implement this in parquet-avro because I think it's a rare problem and 
not worth the time.

> Array of struct with a single field name "element" can't be decoded from 
> Parquet files written by Spark 1.6+
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-16344
>                 URL: https://issues.apache.org/jira/browse/SPARK-16344
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0, 1.6.1, 1.6.2, 2.0.0
>            Reporter: Cheng Lian
>            Assignee: Cheng Lian
>
> This is a weird corner case. Users may hit this issue if they have a schema 
> that
> # has an array field whose element type is a struct, and
> # the struct has one and only one field, and
> # that field is named as "element".
> The following Spark shell snippet for Spark 1.6 reproduces this bug:
> {code}
> case class A(element: Long)
> case class B(f: Array[A])
> val path = "/tmp/silly.parquet"
> Seq(B(Array(A(42)))).toDF("f0").write.mode("overwrite").parquet(path)
> val df = sqlContext.read.parquet(path)
> df.printSchema()
> // root
> //  |-- f0: array (nullable = true)
> //  |    |-- element: struct (containsNull = true)
> //  |    |    |-- element: long (nullable = true)
> df.show()
> {code}
> Exception thrown:
> {noformat}
> org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in 
> block -1 in file 
> file:/tmp/silly.parquet/part-r-00007-e06db7b0-5181-4a14-9fee-5bb452e883a0.gz.parquet
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>         at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>         at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>         at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>         at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>         at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>         at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>         at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: Expected instance of group converter 
> but got 
> "org.apache.spark.sql.execution.datasources.parquet.CatalystPrimitiveConverter"
>         at 
> org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:37)
>         at 
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:266)
>         at 
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
>         at 
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
>         at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
>         at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>         at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
>         ... 26 more
> {noformat}
> Spark 2.0.0-SNAPSHOT and Spark master also suffer this issue. To reproduce it 
> using these versions, just replace {{sqlContext}} in the above snippet with 
> {{spark}}.
> The reason behind is related to Parquet backwards-compatibility rules for 
> LIST types defined in [parquet-format 
> spec|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists].
> The Spark SQL schema shown above
> {noformat}
> root
>  |-- f0: array (nullable = true)
>  |    |-- element: struct (containsNull = true)
>  |    |    |-- element: long (nullable = true)
> {noformat}
> is equivalent to the following SQL type:
> {noformat}
> STRUCT<
>   f: ARRAY<
>     STRUCT<element: BIGINT>
>   >
> >
> {noformat}
> According to the parquet-format spec, the standard layout of a LIST-like 
> structure is a 3-level layout:
> {noformat}
> <list-repetition> group <name> (LIST) {
>   repeated group list {
>     <element-repetition> <element-type> element;
>   }
> }
> {noformat}
> Thus, the standard representation of the aforementioned SQL type should be:
> {noformat}
> message root {
>   optional group f (LIST) {
>     repeated group list {
>       optional group element {    (1)
>         optional int64 element;   (2)
>       }
>     }
>   }
> }
> {noformat}
> Note that the two "element" fields are different:
> - The {{group}} field "element" at (1) is a "container" of list element type. 
> This is defined as part of the parquet-format spec.
> - The {{int64}} field "element" at (2) corresponds to the {{element}} field 
> of case class {{A}} we defined above.
> However, due to historical reasons, various existing systems do not conform 
> to the parquet-format spec and may write LIST structures in a non-standard 
> layout. For example, parquet-avro and parquet-thrift use a 2-level layout like
> {noformat}
> // parquet-avro style
> <list-repetition> group <name> (LIST) {
>   repeated <element-type> array;
> }
> // parquet-thrift style
> <list-repetition> group <name> (LIST) {
>   repeated <element-type> <name>_tuple;
> }
> {noformat}
> To keep backwards-compatibility, the parquet-format spec defined a set of 
> [backwards-compatibility 
> rules|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules]
>  to also recognize these patterns.
> Unfortunately, these backwards-compatibility rules makes the Parquet schema 
> we mentioned above ambiguous:
> {noformat}
> message root {
>   optional group f (LIST) {
>     repeated group list {
>       optional group element {
>         optional int64 element;
>       }
>     }
>   }
> }
> {noformat}
> When interpreted using the standard 3-level layout, it is the expected type:
> {noformat}
> STRUCT<
>   f: ARRAY<
>     STRUCT<element: BIGINT>
>   >
> >
> {noformat}
> When interpreted using the legacy 2-level layout, it is the unexpected type
> {noformat}
> // When interpreted as legacy 2-level layout
> STRUCT<
>   f: ARRAY<
>     STRUCT<element: STRUCT<element: BIGINT>>
>   >
> >
> {noformat}
> This is because the nested struct field name happens to be "element", which 
> is used as a dedicated name of the element type "container" group in the 
> standard 3-level layout, and lead to the ambiguity.
> Currently, Spark 1.6.x, 2.0.0-SNAPSHOT, and master chose the 2nd one. We can 
> fix this issue by giving the standard 3-level layout a higher priority when 
> trying to match schema patterns.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to