[
https://issues.apache.org/jira/browse/SPARK-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365062#comment-15365062
]
Ryan Blue commented on SPARK-16344:
-----------------------------------
It looks like the main change is to specifically catch the 3-level name
structure, {{list-name (LIST) -> "list" -> "element"}}. The problem with this
approach is that it doesn't solve the problem entirely either.
Let me try to give a bit more background. In parquet-avro, there are two
{{isElementType}} methods; one in the schema converter and one in the record
converter. The one in the schema converter will guess whether the Parquet type
uses a 3-level list or a 2-level list when it can't be determined according to
the spec's backward-compatibility rules. That guess assumes a 2-level structure
by default and at the next major release will guess a 3-level structure. (This
can be controlled by a property.) But this is only used when the reader doesn't
supply a read schema / expected schema and the code has to convert from
Parquet's type to get one. Ideally, we always have a read schema from the file,
from the reader's expected class (if using Java objects), or from the reader
passing in the expected schema. That's why the other {{isElementType}} method
exists: it looks at the expected schema and the file schema to determine
whether the caller has passed in a schema with the extra single-field
list/element struct.
That code has to distinguish between two cases for a 3-level list:
1. When the caller expects {{List<OneTuple<ElementType>>}}, with the extra
record layer that was originally returned when Avro only knew about 2-level
lists.
2. When the caller expects {{List<ElementType>}}, without an extra layer.
The code currently assumes that if the element schema appears to match the
repeated type that the caller has passed a schema indicating case 1. This issue
points out that the matching isn't perfect and an element with a single field
named "element" will incorrectly match case 1 when it was really case 2. The
problem with the solution in PR #14013, if it were applied to Avro, is that it
breaks if the caller is actually passing a schema for case 1.
I'm not sure whether Spark works like Avro and has two {{isElementType}}
methods. If Spark can guarantee that the table schema is never case 1, then it
is correct to use the logic in the PR. I don't think that's always the case
because the table schema may come from user objects in a Dataset or from the
Hive MetaStore. But, this may be a reasonable heuristic if you think case 2 is
far more common than case 1. For parquet-avro, I think the user supplying a
single-field record with the inner field named "element" is rare enough that it
doesn't really matter, but it's up to you guys in the Spark community on this
issue.
One last thing: based on the rest of the schema structure, there should be only
one way to match the expected schema to the file schema. You could always try
both and fall back to the other case, or have a more complicated
{{isElementType}} method that recurses down the sub-trees to find a match. I
didn't implement this in parquet-avro because I think it's a rare problem and
not worth the time.
> Array of struct with a single field name "element" can't be decoded from
> Parquet files written by Spark 1.6+
> ------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-16344
> URL: https://issues.apache.org/jira/browse/SPARK-16344
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.0, 1.6.1, 1.6.2, 2.0.0
> Reporter: Cheng Lian
> Assignee: Cheng Lian
>
> This is a weird corner case. Users may hit this issue if they have a schema
> that
> # has an array field whose element type is a struct, and
> # the struct has one and only one field, and
> # that field is named as "element".
> The following Spark shell snippet for Spark 1.6 reproduces this bug:
> {code}
> case class A(element: Long)
> case class B(f: Array[A])
> val path = "/tmp/silly.parquet"
> Seq(B(Array(A(42)))).toDF("f0").write.mode("overwrite").parquet(path)
> val df = sqlContext.read.parquet(path)
> df.printSchema()
> // root
> // |-- f0: array (nullable = true)
> // | |-- element: struct (containsNull = true)
> // | | |-- element: long (nullable = true)
> df.show()
> {code}
> Exception thrown:
> {noformat}
> org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in
> block -1 in file
> file:/tmp/silly.parquet/part-r-00007-e06db7b0-5181-4a14-9fee-5bb452e883a0.gz.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
> at
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
> at
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: Expected instance of group converter
> but got
> "org.apache.spark.sql.execution.datasources.parquet.CatalystPrimitiveConverter"
> at
> org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:37)
> at
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:266)
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
> at
> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
> at
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
> at
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
> ... 26 more
> {noformat}
> Spark 2.0.0-SNAPSHOT and Spark master also suffer this issue. To reproduce it
> using these versions, just replace {{sqlContext}} in the above snippet with
> {{spark}}.
> The reason behind is related to Parquet backwards-compatibility rules for
> LIST types defined in [parquet-format
> spec|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists].
> The Spark SQL schema shown above
> {noformat}
> root
> |-- f0: array (nullable = true)
> | |-- element: struct (containsNull = true)
> | | |-- element: long (nullable = true)
> {noformat}
> is equivalent to the following SQL type:
> {noformat}
> STRUCT<
> f: ARRAY<
> STRUCT<element: BIGINT>
> >
> >
> {noformat}
> According to the parquet-format spec, the standard layout of a LIST-like
> structure is a 3-level layout:
> {noformat}
> <list-repetition> group <name> (LIST) {
> repeated group list {
> <element-repetition> <element-type> element;
> }
> }
> {noformat}
> Thus, the standard representation of the aforementioned SQL type should be:
> {noformat}
> message root {
> optional group f (LIST) {
> repeated group list {
> optional group element { (1)
> optional int64 element; (2)
> }
> }
> }
> }
> {noformat}
> Note that the two "element" fields are different:
> - The {{group}} field "element" at (1) is a "container" of list element type.
> This is defined as part of the parquet-format spec.
> - The {{int64}} field "element" at (2) corresponds to the {{element}} field
> of case class {{A}} we defined above.
> However, due to historical reasons, various existing systems do not conform
> to the parquet-format spec and may write LIST structures in a non-standard
> layout. For example, parquet-avro and parquet-thrift use a 2-level layout like
> {noformat}
> // parquet-avro style
> <list-repetition> group <name> (LIST) {
> repeated <element-type> array;
> }
> // parquet-thrift style
> <list-repetition> group <name> (LIST) {
> repeated <element-type> <name>_tuple;
> }
> {noformat}
> To keep backwards-compatibility, the parquet-format spec defined a set of
> [backwards-compatibility
> rules|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules]
> to also recognize these patterns.
> Unfortunately, these backwards-compatibility rules makes the Parquet schema
> we mentioned above ambiguous:
> {noformat}
> message root {
> optional group f (LIST) {
> repeated group list {
> optional group element {
> optional int64 element;
> }
> }
> }
> }
> {noformat}
> When interpreted using the standard 3-level layout, it is the expected type:
> {noformat}
> STRUCT<
> f: ARRAY<
> STRUCT<element: BIGINT>
> >
> >
> {noformat}
> When interpreted using the legacy 2-level layout, it is the unexpected type
> {noformat}
> // When interpreted as legacy 2-level layout
> STRUCT<
> f: ARRAY<
> STRUCT<element: STRUCT<element: BIGINT>>
> >
> >
> {noformat}
> This is because the nested struct field name happens to be "element", which
> is used as a dedicated name of the element type "container" group in the
> standard 3-level layout, and lead to the ambiguity.
> Currently, Spark 1.6.x, 2.0.0-SNAPSHOT, and master chose the 2nd one. We can
> fix this issue by giving the standard 3-level layout a higher priority when
> trying to match schema patterns.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]