Cheng Lian created SPARK-16344:
----------------------------------

             Summary: Array of struct with a single field name "element" can't 
be decoded from Parquet files written by Spark 1.6+
                 Key: SPARK-16344
                 URL: https://issues.apache.org/jira/browse/SPARK-16344
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.6.2, 1.6.1, 1.6.0, 2.0.0
            Reporter: Cheng Lian
            Assignee: Cheng Lian


The following Spark shell snippet for Spark 1.6 reproduces this bug:

{code}
case class A(element: Long)
case class B(f: Array[A])

val path = "/tmp/silly.parquet"
Seq(B(Array(A(42)))).toDF("f0").write.mode("overwrite").parquet(path)

val df = sqlContext.read.parquet(path)
df.printSchema()
// root
// |-- f0: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- element: long (nullable = true)

df.show()
{code}

Exception thrown:

{noformat}
org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in 
block -1 in file 
file:/tmp/silly.parquet/part-r-00007-e06db7b0-5181-4a14-9fee-5bb452e883a0.gz.parquet
  at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
  at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
  at 
org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
  at scala.collection.AbstractIterator.to(Iterator.scala:1157)
  at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
  at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
  at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
  at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
  at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: Expected instance of group converter 
but got 
"org.apache.spark.sql.execution.datasources.parquet.CatalystPrimitiveConverter"
  at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:37)
  at 
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:266)
  at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
  at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
  at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
  at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
  at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
  at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
  ... 26 more
{noformat}

Spark 2.0.0-SNAPSHOT and Spark master also suffer this issue. To reproduce it 
using these versions, just replace {{sqlContext}} in the above snippet with 
{{spark}}.

The reason behind is related to Parquet backwards-compatibility rules for LIST 
types defined in [parquet-format 
spec|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists].

The Spark SQL schema shown above

{noformat}
root
 |-- f0: array (nullable = true)
 | |-- element: struct (containsNull = true)
 | | |-- element: long (nullable = true)
{noformat}

is equivalent to the following SQL type:

{noformat}
STRUCT<
  f: ARRAY<
  STRUCT<element: BIGINT>
  >
>
{noformat}

According to the parquet-format spec, the standard layout of a LIST-like 
structure is a 3-level layout:

{noformat}
<list-repetition> group <name> (LIST) {
  repeated group list {
  <element-repetition> <element-type> element;
  }
}
{noformat}

Thus, the standard representation of the aforementioned SQL type should be:

{noformat}
message root {
  optional group f (LIST) {
  repeated group list {
  optional group element { (1)
  optional int64 element; (2)
  }
  }
  }
}
{noformat}

Note that the two "element" fields are different:

- The {{group}} field "element" at (1) is a "container" of list element type. 
This is defined as part of the parquet-format spec.
- The {{int64}} field "element" at (2) corresponds to the {{element}} field of 
case class {{A}} we defined above.

However, due to historical reasons, various existing systems do not conform to 
the parquet-format spec and may write LIST structures in a non-standard layout. 
For example, parquet-avro and parquet-thrift use a 2-level layout like

{noformat}
// parquet-avro style
<list-repetition> group <name> (LIST) {
  repeated <element-type> array;
}

// parquet-thrift style
<list-repetition> group <name> (LIST) {
  repeated <element-type> <name>_tuple;
}
{noformat}

To keep backwards-compatibility, the parquet-format spec defined a set of 
[backwards-compatibility 
rules|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules]
 to also recognize these patterns.

Unfortunately, these backwards-compatibility rules makes the Parquet schema we 
mentioned above ambiguous:

{noformat}
message root {
  optional group f (LIST) {
  repeated group list {
  optional group element {
  optional int64 element;
  }
  }
  }
}
{noformat}

When interpreted using the standard 3-level layout, it is the expected type:

{noformat}
STRUCT<
  f: ARRAY<
  STRUCT<element: BIGINT>
  >
>
{noformat}

When interpreted using the legacy 2-level layout, it is the unexpected type

{noformat}
// When interpreted as legacy 2-level layout
STRUCT<
  f: ARRAY<
  STRUCT<element: STRUCT<element: BIGINT>>
  >
>
{noformat}

This is because the nested struct field name happens to be "element", which is 
used as a dedicated name of the element type "container" group in the standard 
3-level layout, and lead to the ambiguity.

Currently, Spark 1.6.x, 2.0.0-SNAPSHOT, and master chose the 2nd one. We can 
fix this issue by giving the standard 3-level layout a higher priority when 
trying to match schema patterns.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to