Repository: spark
Updated Branches:
  refs/heads/branch-1.2 abdcec673 -> e080cc3e5


[SPARK-5775] BugFix: GenericRow cannot be cast to SpecificMutableRow when 
nested data and partitioned table

The Bug solved here was due to a change in PartitionTableScan, when reading a 
partitioned table.

- When the Partititon column is requested out of a parquet table, the Table 
Scan needs to add the column back to the output Rows.
- To update the Row object created by PartitionTableScan, the Row was first 
casted in SpecificMutableRow, before being updated.
- This casting was unsafe, since there are no guarantee that the newHadoopRDD 
used internally will instanciate the output Rows as MutableRow.

Particularly, when reading a Table with complex (e.g. struct or Array) types,  
the newHadoopRDD  uses a parquet.io.api.RecordMateralizer, that is produced by 
the org.apache.spark.sql.parquet.RowReadSupport . This consumer will be created 
as a org.apache.spark.sql.parquet.CatalystGroupConverter (a) and not a 
org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter (b), when there are 
complex types involved (in the 
org.apache.spark.sql.parquet.CatalystConverter.createRootConverter factory  )

The consumer (a) will output GenericRow, while the consumer (b) produces 
SpecificMutableRow.

Therefore any request selecting a partition columns, plus a complex type 
column, are returned as GenericRows, and fails into an unsafe casting pit (see 
https://issues.apache.org/jira/browse/SPARK-5775 for an example. )

The fix proposed here originally replaced the unsafe class casting by a case 
matching on the Row type, updating the Row if it is of a mutable type, and 
recreating a Row otherwise.

This PR now implements the solution updated by liancheng on 
aa39460d4bb4c41084d350ccb1c5a56cd61239b7 :

The fix checks if every requested requested columns are primitiveType, in a 
manner symmetrical to the check in 
org.apache.spark.sql.parquet.CatalystConverter.createRootConverter.
 - If all columns are primitive type,  the Row can safely be casted to a 
MutableRow.
 - Otherwise a new GenericRow is created, and the partition column is written 
this new row structure

This fix is unit-tested in  
sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Author: Anselme Vignon <[email protected]>
Author: Cheng Lian <[email protected]>

Closes #4697 from anselmevignon/local_dev and squashes the following commits:

6a4c53d [Anselme Vignon] style corrections
52f73fc [Cheng Lian] cherry-pick & merge from 
aa39460d4bb4c41084d350ccb1c5a56cd61239b7
8fc6a8c [Anselme Vignon] correcting tests on temporary tables
24928ea [Anselme Vignon] corrected mirror bug (see SPARK-5775) for newParquet
7c829cb [Anselme Vignon] bugfix, hopefully correct this time
005a7f8 [Anselme Vignon] added test cleanup
22cec52 [Anselme Vignon] lint compatible changes
ae48f7c [Anselme Vignon] unittesting SPARK-5775
f876dea [Anselme Vignon] starting to write tests
dbceaa3 [Anselme Vignon] cutting lines
4eb04e9 [Anselme Vignon] bugfix SPARK-5775


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e080cc3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e080cc3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e080cc3e

Branch: refs/heads/branch-1.2
Commit: e080cc3e5019379ac78323c03c1b991b433c6ca9
Parents: abdcec6
Author: Anselme Vignon <[email protected]>
Authored: Mon Mar 23 12:00:50 2015 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Mon Mar 23 12:00:50 2015 -0700

----------------------------------------------------------------------
 .../sql/parquet/ParquetTableOperations.scala    |  58 +++++--
 .../apache/spark/sql/parquet/newParquet.scala   |  47 ++++--
 .../spark/sql/parquet/parquetSuites.scala       | 151 ++++++++++++++++++-
 3 files changed, 230 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e080cc3e/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 072a4bc..3bd086d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -124,6 +124,13 @@ case class ParquetTableScan(
         conf)
 
     if (requestedPartitionOrdinals.nonEmpty) {
+      // This check is based on CatalystConverter.createRootConverter.
+      val primitiveRow = output.forall(a => 
ParquetTypesConverter.isPrimitiveType(a.dataType))
+
+      // Uses temporary variable to avoid the whole `ParquetTableScan` object 
being captured into
+      // the `mapPartitionsWithInputSplit` closure below.
+      val outputSize = output.size
+
       baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
         val partValue = "([^=]+)=([^=]+)".r
         val partValues =
@@ -141,19 +148,46 @@ case class ParquetTableScan(
           relation.partitioningAttributes
             .map(a => Cast(Literal(partValues(a.name)), 
a.dataType).eval(EmptyRow))
 
-        new Iterator[Row] {
-          def hasNext = iter.hasNext
-          def next() = {
-            val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
-
-            // Parquet will leave partitioning columns empty, so we fill them 
in here.
-            var i = 0
-            while (i < requestedPartitionOrdinals.size) {
-              row(requestedPartitionOrdinals(i)._2) =
-                partitionRowValues(requestedPartitionOrdinals(i)._1)
-              i += 1
+        // Parquet will leave partitioning columns empty, so we fill them in 
here.
+        if (primitiveRow) {
+          new Iterator[Row] {
+            def hasNext = iter.hasNext
+            def next() = {
+              // We are using CatalystPrimitiveRowConverter and it returns a 
SpecificMutableRow.
+              val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
+
+              var i = 0
+              while (i < requestedPartitionOrdinals.size) {
+                row(requestedPartitionOrdinals(i)._2) =
+                  partitionRowValues(requestedPartitionOrdinals(i)._1)
+                i += 1
+              }
+              row
+            }
+          }
+        } else {
+          // Create a mutable row since we need to fill in values from 
partition columns.
+          val mutableRow = new GenericMutableRow(outputSize)
+          new Iterator[Row] {
+            def hasNext = iter.hasNext
+            def next() = {
+              // We are using CatalystGroupConverter and it returns a 
GenericRow.
+              // Since GenericRow is not mutable, we just cast it to a Row.
+              val row = iter.next()._2.asInstanceOf[Row]
+
+              var i = 0
+              while (i < row.size) {
+                mutableRow(i) = row(i)
+                i += 1
+              }
+              i = 0
+              while (i < requestedPartitionOrdinals.size) {
+                mutableRow(requestedPartitionOrdinals(i)._2) =
+                  partitionRowValues(requestedPartitionOrdinals(i)._1)
+                i += 1
+              }
+              mutableRow
             }
-            row
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/e080cc3e/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 2e0c6c5..a7506c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -22,9 +22,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
-import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
 
-import parquet.hadoop.ParquetInputFormat
+import parquet.hadoop.{ParquetInputSplit, ParquetInputFormat}
 import parquet.hadoop.util.ContextUtil
 
 import org.apache.spark.annotation.DeveloperApi
@@ -40,7 +39,7 @@ import scala.collection.JavaConversions._
 
 /**
  * Allows creation of parquet based tables using the syntax
- * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`.  Currently 
the only option 
+ * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`.  Currently 
the only option
  * required is `path`, which should be the location of a collection of, 
optionally partitioned,
  * parquet files.
  */
@@ -265,7 +264,10 @@ case class ParquetRelation2(path: String)(@transient val 
sqlContext: SQLContext)
     // When the data does not include the key and the key is requested then we 
must fill it in
     // based on information from the input split.
     if (!dataIncludesKey && partitionKeyLocation != -1) {
-      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
+      val primitiveRow =
+        requestedSchema.toAttributes.forall(a => 
ParquetTypesConverter.isPrimitiveType(a.dataType))
+
+      baseRDD.mapPartitionsWithInputSplit { case (split, iterator) =>
         val partValue = "([^=]+)=([^=]+)".r
         val partValues =
           split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
@@ -273,15 +275,34 @@ case class ParquetRelation2(path: String)(@transient val 
sqlContext: SQLContext)
             .toString
             .split("/")
             .flatMap {
-            case partValue(key, value) => Some(key -> value)
-            case _ => None
-          }.toMap
-
-        val currentValue = partValues.values.head.toInt
-        iter.map { pair =>
-          val res = pair._2.asInstanceOf[SpecificMutableRow]
-          res.setInt(partitionKeyLocation, currentValue)
-          res
+              case partValue(key, value) => Some(key -> value)
+              case _ => None }
+            .toMap
+
+        if (primitiveRow) {
+          iterator.map { pair =>
+            // We are using CatalystPrimitiveRowConverter and it returns a 
SpecificMutableRow.
+            val mutableRow = pair._2.asInstanceOf[SpecificMutableRow]
+            var i = 0
+            mutableRow.update(partitionKeyLocation, 
partValues.values.head.toInt)
+            mutableRow
+          }
+        } else {
+          // Create a mutable row since we need to fill in values from 
partition columns.
+          val mutableRow = new 
GenericMutableRow(requestedSchema.toAttributes.size)
+
+          iterator.map { pair =>
+            // We are using CatalystGroupConverter and it returns a GenericRow.
+            // Since GenericRow is not mutable, we just cast it to a Row.
+            val row = pair._2.asInstanceOf[Row]
+            var i = 0
+            while (i < row.size) {
+              mutableRow(i) = row(i)
+              i += 1
+            }
+            mutableRow.update(partitionKeyLocation, 
partValues.values.head.toInt)
+            mutableRow
+          }
         }
       }
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/e080cc3e/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 06fe144..d788b70 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -31,7 +31,20 @@ import org.apache.spark.sql.hive.test.TestHive._
 case class ParquetData(intField: Int, stringField: String)
 // The data that also includes the partitioning key
 case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+case class StructContainer(intStructField: Int, stringStructField: String)
 
+case class ParquetDataWithComplexTypes(
+    intField: Int,
+    stringField: String,
+    structField: StructContainer,
+    arrayField: Seq[Int])
+
+case class ParquetDataWithKeyAndComplexTypes(
+    p: Int,
+    intField: Int,
+    stringField: String,
+    structField: StructContainer,
+    arrayField: Seq[Int])
 
 /**
  * A suite to test the automatic conversion of metastore tables with parquet 
data to use the
@@ -70,6 +83,38 @@ class ParquetMetastoreSuite extends ParquetTest {
     """)
 
     sql(s"""
+      create external table partitioned_parquet_with_complextypes
+      (
+        intField INT,
+        stringField STRING,
+        structField STRUCT<intStructField: INT, stringStructField: STRING>,
+        arrayField ARRAY<INT>
+      )
+      PARTITIONED BY (p int)
+      ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      location '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+    """)
+    
+    sql(s"""
+      create external table partitioned_parquet_with_key_and_complextypes
+      (
+        intField INT,
+        stringField STRING,
+        structField STRUCT<intStructField: INT, stringStructField: STRING>,
+        arrayField ARRAY<INT>
+      )
+      PARTITIONED BY (p int)
+      ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      location '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+    """)
+
+    sql(s"""
       create external table normal_parquet
       (
         intField INT,
@@ -90,10 +135,24 @@ class ParquetMetastoreSuite extends ParquetTest {
       sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
     }
 
+    (1 to 10).foreach { p =>
+      sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD 
PARTITION (p=$p)")
+    }
+
+    (1 to 10).foreach { p =>
+      sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION 
(p=$p)")
+    }
+
     setConf("spark.sql.hive.convertMetastoreParquet", "true")
   }
 
   override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS partitioned_parquet")
+    sql("DROP TABLE IF EXISTS partitioned_parquet_with_key")
+    sql("DROP TABLE IF EXISTS partitioned_parquet_with_complextypes")
+    sql("DROP TABLE IF EXISTS partitioned_parquet_with_key_and_complextypes")
+    sql("DROP TABLE IF EXISTS normal_parquet")
+
     setConf("spark.sql.hive.convertMetastoreParquet", "false")
   }
 
@@ -139,6 +198,22 @@ class ParquetSourceSuite extends ParquetTest {
         path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
       )
     """)
+
+    sql( s"""
+      CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+      )
+    """)
+
+    sql( s"""
+      CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+      )
+    """)
   }
 }
 
@@ -147,7 +222,10 @@ class ParquetSourceSuite extends ParquetTest {
  */
 abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
   var partitionedTableDir: File = null
+  var normalTableDir: File = null
   var partitionedTableDirWithKey: File = null
+  var partitionedTableDirWithKeyAndComplexTypes: File = null
+  var partitionedTableDirWithComplexTypes: File = null
 
   override def beforeAll(): Unit = {
     partitionedTableDir = File.createTempFile("parquettests", "sparksql")
@@ -161,6 +239,15 @@ abstract class ParquetTest extends QueryTest with 
BeforeAndAfterAll {
         .saveAsParquetFile(partDir.getCanonicalPath)
     }
 
+    normalTableDir = File.createTempFile("parquettests", "sparksql")
+    normalTableDir.delete()
+    normalTableDir.mkdir()
+
+    sparkContext
+      .makeRDD(1 to 10)
+      .map(i => ParquetData(i, s"part-1"))
+      .saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath)
+
     partitionedTableDirWithKey = File.createTempFile("parquettests", 
"sparksql")
     partitionedTableDirWithKey.delete()
     partitionedTableDirWithKey.mkdir()
@@ -171,9 +258,46 @@ abstract class ParquetTest extends QueryTest with 
BeforeAndAfterAll {
         .map(i => ParquetDataWithKey(p, i, s"part-$p"))
         .saveAsParquetFile(partDir.getCanonicalPath)
     }
+
+    partitionedTableDirWithKeyAndComplexTypes = 
File.createTempFile("parquettests", "sparksql")
+    partitionedTableDirWithKeyAndComplexTypes.delete()
+    partitionedTableDirWithKeyAndComplexTypes.mkdir()
+
+    (1 to 10).foreach { p =>
+      val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, 
s"p=$p")
+      sparkContext.makeRDD(1 to 10)
+        .map(i => ParquetDataWithKeyAndComplexTypes(
+          p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i))
+        .saveAsParquetFile(partDir.getCanonicalPath)
+    }
+
+    partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", 
"sparksql")
+    partitionedTableDirWithComplexTypes.delete()
+    partitionedTableDirWithComplexTypes.mkdir()
+
+    (1 to 10).foreach { p =>
+      val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
+      sparkContext.makeRDD(1 to 10)
+        .map(i => ParquetDataWithComplexTypes(
+          i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i))
+        .saveAsParquetFile(partDir.getCanonicalPath)
+    }
   }
 
-  Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
+  override protected def afterAll(): Unit = {
+    //delete temporary files
+    partitionedTableDir.delete()
+    normalTableDir.delete()
+    partitionedTableDirWithKey.delete()
+    partitionedTableDirWithKeyAndComplexTypes.delete()
+    partitionedTableDirWithComplexTypes.delete()
+  }
+
+  Seq(
+    "partitioned_parquet",
+    "partitioned_parquet_with_key",
+    "partitioned_parquet_with_complextypes",
+    "partitioned_parquet_with_key_and_complextypes").foreach { table =>
     test(s"ordering of the partitioning columns $table") {
       checkAnswer(
         sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
@@ -186,6 +310,8 @@ abstract class ParquetTest extends QueryTest with 
BeforeAndAfterAll {
       )
     }
 
+
+
     test(s"project the partitioning column $table") {
       checkAnswer(
         sql(s"SELECT p, count(*) FROM $table group by p"),
@@ -263,6 +389,29 @@ abstract class ParquetTest extends QueryTest with 
BeforeAndAfterAll {
     }
   }
 
+  Seq(
+    "partitioned_parquet_with_key_and_complextypes",
+    "partitioned_parquet_with_complextypes").foreach { table =>
+    test(s"SPARK-5775 read structure from $table") {
+      checkAnswer(
+        sql(s"""
+          SELECT
+            p,
+            structField.intStructField,
+            structField.stringStructField
+          FROM $table
+          WHERE p = 1"""),
+        (1 to 10).map(i => Row(1, i, f"${i}_string")))
+    }
+
+    // Re-enable this after SPARK-5508 is fixed
+    ignore(s"SPARK-5775 read array from $table") {
+      checkAnswer(
+        sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
+        (1 to 10).map(i => Row(1 to i, 1)))
+    }
+  }
+
   test("non-part select(*)") {
     checkAnswer(
       sql("SELECT COUNT(*) FROM normal_parquet"),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to