Repository: spark
Updated Branches:
  refs/heads/master e3cd5b305 -> e651900bd


[SPARK-16344][SQL] Decoding Parquet array of struct with a single field named 
"element"

## What changes were proposed in this pull request?

Due to backward-compatibility reasons, the following Parquet schema is 
ambiguous:

```
optional group f (LIST) {
  repeated group list {
    optional group element {
      optional int32 element;
    }
  }
}
```

According to the parquet-format spec, when interpreted as a standard 3-level 
layout, this type is equivalent to the following SQL type:

```
ARRAY<STRUCT<element: INT>>
```

However, when interpreted as a legacy 2-level layout, it's equivalent to

```
ARRAY<STRUCT<element: STRUCT<element: INT>>>
```

Historically, to disambiguate these cases, we employed two methods:

- `ParquetSchemaConverter.isElementType()`

  Used to disambiguate the above cases while converting Parquet types to Spark 
types.

- `ParquetRowConverter.isElementType()`

  Used to disambiguate the above cases while instantiating row converters that 
convert Parquet records to Spark rows.

Unfortunately, these two methods make different decision about the above 
problematic Parquet type, and caused SPARK-16344.

`ParquetRowConverter.isElementType()` is necessary for Spark 1.4 and earlier 
versions because Parquet requested schemata are directly converted from Spark 
schemata in these versions. The converted Parquet schemata may be incompatible 
with actual schemata of the underlying physical files when the files are 
written by a system/library that uses a schema conversion scheme that is 
different from Spark when writing Parquet LIST and MAP fields.

In Spark 1.5, Parquet requested schemata are always properly tailored from 
schemata of physical files to be read. Thus 
`ParquetRowConverter.isElementType()` is no longer necessary. This PR replaces 
this method with a simply yet accurate scheme: whenever an ambiguous Parquet 
type is hit, convert the type in question back to a Spark type using 
`ParquetSchemaConverter` and check whether it matches the corresponding Spark 
type.

## How was this patch tested?

New test cases added in `ParquetHiveCompatibilitySuite` and `ParquetQuerySuite`.

Author: Cheng Lian <l...@databricks.com>

Closes #14014 from liancheng/spark-16344-for-master-and-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e651900b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e651900b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e651900b

Branch: refs/heads/master
Commit: e651900bd562cc29a3eb13e92a5147979e347f61
Parents: e3cd5b3
Author: Cheng Lian <l...@databricks.com>
Authored: Wed Jul 20 16:49:46 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Jul 20 16:49:46 2016 -0700

----------------------------------------------------------------------
 .../parquet/ParquetReadSupport.scala            |  3 +-
 .../parquet/ParquetRecordMaterializer.scala     |  6 +-
 .../parquet/ParquetRowConverter.scala           | 79 +++++++++++---------
 .../parquet/ParquetSchemaConverter.scala        |  8 +-
 .../datasources/parquet/ParquetQuerySuite.scala | 16 +++-
 .../hive/ParquetHiveCompatibilitySuite.scala    |  8 +-
 6 files changed, 73 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e651900b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 46d786d..0bee874 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -96,7 +96,8 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[InternalRow] with
 
     new ParquetRecordMaterializer(
       parquetRequestedSchema,
-      ParquetReadSupport.expandUDT(catalystRequestedSchema))
+      ParquetReadSupport.expandUDT(catalystRequestedSchema),
+      new ParquetSchemaConverter(conf))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e651900b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index 0818d80..d12e780 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -28,12 +28,14 @@ import org.apache.spark.sql.types.StructType
  *
  * @param parquetSchema Parquet schema of the records to be read
  * @param catalystSchema Catalyst schema of the rows to be constructed
+ * @param schemaConverter A Parquet-Catalyst schema converter that helps 
initializing row converters
  */
 private[parquet] class ParquetRecordMaterializer(
-    parquetSchema: MessageType, catalystSchema: StructType)
+    parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: 
ParquetSchemaConverter)
   extends RecordMaterializer[InternalRow] {
 
-  private val rootConverter = new ParquetRowConverter(parquetSchema, 
catalystSchema, NoopUpdater)
+  private val rootConverter =
+    new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, 
NoopUpdater)
 
   override def getCurrentRecord: InternalRow = rootConverter.currentRecord
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e651900b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 9dad596..9ffc2b5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, 
PrimitiveConverter}
-import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
+import org.apache.parquet.schema.{GroupType, MessageType, Type}
 import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, 
DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}
 
@@ -113,12 +113,14 @@ private[parquet] class ParquetPrimitiveConverter(val 
updater: ParentContainerUpd
  * When used as a root converter, [[NoopUpdater]] should be used since root 
converters don't have
  * any "parent" container.
  *
+ * @param schemaConverter A utility converter used to convert Parquet types to 
Catalyst types.
  * @param parquetType Parquet schema of Parquet records
  * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type. User-defined
  *        types should have been expanded.
  * @param updater An updater which propagates converted field values to the 
parent container
  */
 private[parquet] class ParquetRowConverter(
+    schemaConverter: ParquetSchemaConverter,
     parquetType: GroupType,
     catalystType: StructType,
     updater: ParentContainerUpdater)
@@ -292,9 +294,10 @@ private[parquet] class ParquetRowConverter(
         new ParquetMapConverter(parquetType.asGroupType(), t, updater)
 
       case t: StructType =>
-        new ParquetRowConverter(parquetType.asGroupType(), t, new 
ParentContainerUpdater {
-          override def set(value: Any): Unit = 
updater.set(value.asInstanceOf[InternalRow].copy())
-        })
+        new ParquetRowConverter(
+          schemaConverter, parquetType.asGroupType(), t, new 
ParentContainerUpdater {
+            override def set(value: Any): Unit = 
updater.set(value.asInstanceOf[InternalRow].copy())
+          })
 
       case t =>
         throw new RuntimeException(
@@ -442,13 +445,46 @@ private[parquet] class ParquetRowConverter(
     private val elementConverter: Converter = {
       val repeatedType = parquetSchema.getType(0)
       val elementType = catalystSchema.elementType
-      val parentName = parquetSchema.getName
 
-      if (isElementType(repeatedType, elementType, parentName)) {
+      // At this stage, we're not sure whether the repeated field maps to the 
element type or is
+      // just the syntactic repeated group of the 3-level standard LIST 
layout. Take the following
+      // Parquet LIST-annotated group type as an example:
+      //
+      //    optional group f (LIST) {
+      //      repeated group list {
+      //        optional group element {
+      //          optional int32 element;
+      //        }
+      //      }
+      //    }
+      //
+      // This type is ambiguous:
+      //
+      // 1. When interpreted as a standard 3-level layout, the `list` field is 
just the syntactic
+      //    group, and the entire type should be translated to:
+      //
+      //      ARRAY<STRUCT<element: INT>>
+      //
+      // 2. On the other hand, when interpreted as a non-standard 2-level 
layout, the `list` field
+      //    represents the element type, and the entire type should be 
translated to:
+      //
+      //      ARRAY<STRUCT<element: STRUCT<element: INT>>>
+      //
+      // Here we try to convert field `list` into a Catalyst type to see 
whether the converted type
+      // matches the Catalyst array element type. If it doesn't match, then 
it's case 1; otherwise,
+      // it's case 2.
+      val guessedElementType = schemaConverter.convertField(repeatedType)
+
+      if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, 
elementType)) {
+        // If the repeated field corresponds to the element type, creates a 
new converter using the
+        // type of the repeated field.
         newConverter(repeatedType, elementType, new ParentContainerUpdater {
           override def set(value: Any): Unit = currentArray += value
         })
       } else {
+        // If the repeated field corresponds to the syntactic group in the 
standard 3-level Parquet
+        // LIST layout, creates a new converter using the only child field of 
the repeated field.
+        assert(!repeatedType.isPrimitive && 
repeatedType.asGroupType().getFieldCount == 1)
         new ElementConverter(repeatedType.asGroupType().getType(0), 
elementType)
       }
     }
@@ -462,37 +498,6 @@ private[parquet] class ParquetRowConverter(
     // in row cells.
     override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
 
-    // scalastyle:off
-    /**
-     * Returns whether the given type is the element type of a list or is a 
syntactic group with
-     * one field that is the element type.  This is determined by checking 
whether the type can be
-     * a syntactic group and by checking whether a potential syntactic group 
matches the expected
-     * schema.
-     * {{{
-     *   <list-repetition> group <name> (LIST) {
-     *     repeated group list {                          <-- repeatedType 
points here
-     *       <element-repetition> <element-type> element;
-     *     }
-     *   }
-     * }}}
-     * In short, here we handle Parquet list backwards-compatibility rules on 
the read path.  This
-     * method is based on `AvroIndexedRecordConverter.isElementType`.
-     *
-     * @see 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
-     */
-    // scalastyle:on
-    private def isElementType(
-        parquetRepeatedType: Type, catalystElementType: DataType, parentName: 
String): Boolean = {
-      (parquetRepeatedType, catalystElementType) match {
-        case (t: PrimitiveType, _) => true
-        case (t: GroupType, _) if t.getFieldCount > 1 => true
-        case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" 
=> true
-        case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == 
parentName + "_tuple" => true
-        case (t: GroupType, StructType(Array(f))) if f.name == 
t.getFieldName(0) => true
-        case _ => false
-      }
-    }
-
     /** Array element converter */
     private final class ElementConverter(parquetType: Type, catalystType: 
DataType)
       extends GroupConverter {

http://git-wip-us.apache.org/repos/asf/spark/blob/e651900b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index bcf535d..c81a65f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -260,7 +260,7 @@ private[parquet] class ParquetSchemaConverter(
     {
       // For legacy 2-level list types with primitive element type, e.g.:
       //
-      //    // List<Integer> (nullable list, non-null elements)
+      //    // ARRAY<INT> (nullable list, non-null elements)
       //    optional group my_list (LIST) {
       //      repeated int32 element;
       //    }
@@ -270,7 +270,7 @@ private[parquet] class ParquetSchemaConverter(
       // For legacy 2-level list types whose element type is a group type with 
2 or more fields,
       // e.g.:
       //
-      //    // List<Tuple<String, Integer>> (nullable list, non-null elements)
+      //    // ARRAY<STRUCT<str: STRING, num: INT>> (nullable list, non-null 
elements)
       //    optional group my_list (LIST) {
       //      repeated group element {
       //        required binary str (UTF8);
@@ -282,7 +282,7 @@ private[parquet] class ParquetSchemaConverter(
     } || {
       // For legacy 2-level list types generated by parquet-avro (Parquet 
version < 1.6.0), e.g.:
       //
-      //    // List<OneTuple<String>> (nullable list, non-null elements)
+      //    // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
       //    optional group my_list (LIST) {
       //      repeated group array {
       //        required binary str (UTF8);
@@ -293,7 +293,7 @@ private[parquet] class ParquetSchemaConverter(
     } || {
       // For Parquet data generated by parquet-thrift, e.g.:
       //
-      //    // List<OneTuple<String>> (nullable list, non-null elements)
+      //    // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
       //    optional group my_list (LIST) {
       //      repeated group my_list_tuple {
       //        required binary str (UTF8);

http://git-wip-us.apache.org/repos/asf/spark/blob/e651900b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 7c394e0..02b9445 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
 import org.apache.spark.sql.execution.BatchedDataSourceScanExec
-import 
org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, 
NestedStructUDT}
+import 
org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, 
NestedStructUDT, SingleElement}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -668,9 +668,23 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
       }
     }
   }
+
+  test("SPARK-16344: array of struct with a single field named 'element'") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      Seq(Tuple1(Array(SingleElement(42)))).toDF("f").write.parquet(path)
+
+      checkAnswer(
+        sqlContext.read.parquet(path),
+        Row(Array(Row(42)))
+      )
+    }
+  }
 }
 
 object TestingUDT {
+  case class SingleElement(element: Long)
+
   @SQLUserDefinedType(udt = classOf[NestedStructUDT])
   case class NestedStruct(a: Integer, b: Long, c: Double)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e651900b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index ac89bbb..2b57646 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.hive
 
 import java.sql.Timestamp
 
-import org.apache.hadoop.hive.conf.HiveConf
-
 import org.apache.spark.sql.Row
 import 
org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
 import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -137,4 +135,10 @@ class ParquetHiveCompatibilitySuite extends 
ParquetCompatibilityTest with TestHi
       Row(Row(1, Seq("foo", "bar", null))),
       "STRUCT<f0: INT, f1: ARRAY<STRING>>")
   }
+
+  test("SPARK-16344: array of struct with a single field named 
'array_element'") {
+    testParquetHiveCompatibility(
+      Row(Seq(Row(1))),
+      "ARRAY<STRUCT<array_element: INT>>")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to