Repository: spark Updated Branches: refs/heads/branch-1.2 abdcec673 -> e080cc3e5
[SPARK-5775] BugFix: GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table The Bug solved here was due to a change in PartitionTableScan, when reading a partitioned table. - When the Partititon column is requested out of a parquet table, the Table Scan needs to add the column back to the output Rows. - To update the Row object created by PartitionTableScan, the Row was first casted in SpecificMutableRow, before being updated. - This casting was unsafe, since there are no guarantee that the newHadoopRDD used internally will instanciate the output Rows as MutableRow. Particularly, when reading a Table with complex (e.g. struct or Array) types, the newHadoopRDD uses a parquet.io.api.RecordMateralizer, that is produced by the org.apache.spark.sql.parquet.RowReadSupport . This consumer will be created as a org.apache.spark.sql.parquet.CatalystGroupConverter (a) and not a org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter (b), when there are complex types involved (in the org.apache.spark.sql.parquet.CatalystConverter.createRootConverter factory ) The consumer (a) will output GenericRow, while the consumer (b) produces SpecificMutableRow. Therefore any request selecting a partition columns, plus a complex type column, are returned as GenericRows, and fails into an unsafe casting pit (see https://issues.apache.org/jira/browse/SPARK-5775 for an example. ) The fix proposed here originally replaced the unsafe class casting by a case matching on the Row type, updating the Row if it is of a mutable type, and recreating a Row otherwise. This PR now implements the solution updated by liancheng on aa39460d4bb4c41084d350ccb1c5a56cd61239b7 : The fix checks if every requested requested columns are primitiveType, in a manner symmetrical to the check in org.apache.spark.sql.parquet.CatalystConverter.createRootConverter. - If all columns are primitive type, the Row can safely be casted to a MutableRow. - Otherwise a new GenericRow is created, and the partition column is written this new row structure This fix is unit-tested in sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala Author: Anselme Vignon <[email protected]> Author: Cheng Lian <[email protected]> Closes #4697 from anselmevignon/local_dev and squashes the following commits: 6a4c53d [Anselme Vignon] style corrections 52f73fc [Cheng Lian] cherry-pick & merge from aa39460d4bb4c41084d350ccb1c5a56cd61239b7 8fc6a8c [Anselme Vignon] correcting tests on temporary tables 24928ea [Anselme Vignon] corrected mirror bug (see SPARK-5775) for newParquet 7c829cb [Anselme Vignon] bugfix, hopefully correct this time 005a7f8 [Anselme Vignon] added test cleanup 22cec52 [Anselme Vignon] lint compatible changes ae48f7c [Anselme Vignon] unittesting SPARK-5775 f876dea [Anselme Vignon] starting to write tests dbceaa3 [Anselme Vignon] cutting lines 4eb04e9 [Anselme Vignon] bugfix SPARK-5775 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e080cc3e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e080cc3e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e080cc3e Branch: refs/heads/branch-1.2 Commit: e080cc3e5019379ac78323c03c1b991b433c6ca9 Parents: abdcec6 Author: Anselme Vignon <[email protected]> Authored: Mon Mar 23 12:00:50 2015 -0700 Committer: Michael Armbrust <[email protected]> Committed: Mon Mar 23 12:00:50 2015 -0700 ---------------------------------------------------------------------- .../sql/parquet/ParquetTableOperations.scala | 58 +++++-- .../apache/spark/sql/parquet/newParquet.scala | 47 ++++-- .../spark/sql/parquet/parquetSuites.scala | 151 ++++++++++++++++++- 3 files changed, 230 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e080cc3e/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 072a4bc..3bd086d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -124,6 +124,13 @@ case class ParquetTableScan( conf) if (requestedPartitionOrdinals.nonEmpty) { + // This check is based on CatalystConverter.createRootConverter. + val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType)) + + // Uses temporary variable to avoid the whole `ParquetTableScan` object being captured into + // the `mapPartitionsWithInputSplit` closure below. + val outputSize = output.size + baseRDD.mapPartitionsWithInputSplit { case (split, iter) => val partValue = "([^=]+)=([^=]+)".r val partValues = @@ -141,19 +148,46 @@ case class ParquetTableScan( relation.partitioningAttributes .map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow)) - new Iterator[Row] { - def hasNext = iter.hasNext - def next() = { - val row = iter.next()._2.asInstanceOf[SpecificMutableRow] - - // Parquet will leave partitioning columns empty, so we fill them in here. - var i = 0 - while (i < requestedPartitionOrdinals.size) { - row(requestedPartitionOrdinals(i)._2) = - partitionRowValues(requestedPartitionOrdinals(i)._1) - i += 1 + // Parquet will leave partitioning columns empty, so we fill them in here. + if (primitiveRow) { + new Iterator[Row] { + def hasNext = iter.hasNext + def next() = { + // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. + val row = iter.next()._2.asInstanceOf[SpecificMutableRow] + + var i = 0 + while (i < requestedPartitionOrdinals.size) { + row(requestedPartitionOrdinals(i)._2) = + partitionRowValues(requestedPartitionOrdinals(i)._1) + i += 1 + } + row + } + } + } else { + // Create a mutable row since we need to fill in values from partition columns. + val mutableRow = new GenericMutableRow(outputSize) + new Iterator[Row] { + def hasNext = iter.hasNext + def next() = { + // We are using CatalystGroupConverter and it returns a GenericRow. + // Since GenericRow is not mutable, we just cast it to a Row. + val row = iter.next()._2.asInstanceOf[Row] + + var i = 0 + while (i < row.size) { + mutableRow(i) = row(i) + i += 1 + } + i = 0 + while (i < requestedPartitionOrdinals.size) { + mutableRow(requestedPartitionOrdinals(i)._2) = + partitionRowValues(requestedPartitionOrdinals(i)._1) + i += 1 + } + mutableRow } - row } } } http://git-wip-us.apache.org/repos/asf/spark/blob/e080cc3e/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 2e0c6c5..a7506c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -22,9 +22,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job} -import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate -import parquet.hadoop.ParquetInputFormat +import parquet.hadoop.{ParquetInputSplit, ParquetInputFormat} import parquet.hadoop.util.ContextUtil import org.apache.spark.annotation.DeveloperApi @@ -40,7 +39,7 @@ import scala.collection.JavaConversions._ /** * Allows creation of parquet based tables using the syntax - * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option + * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option * required is `path`, which should be the location of a collection of, optionally partitioned, * parquet files. */ @@ -265,7 +264,10 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) // When the data does not include the key and the key is requested then we must fill it in // based on information from the input split. if (!dataIncludesKey && partitionKeyLocation != -1) { - baseRDD.mapPartitionsWithInputSplit { case (split, iter) => + val primitiveRow = + requestedSchema.toAttributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType)) + + baseRDD.mapPartitionsWithInputSplit { case (split, iterator) => val partValue = "([^=]+)=([^=]+)".r val partValues = split.asInstanceOf[parquet.hadoop.ParquetInputSplit] @@ -273,15 +275,34 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) .toString .split("/") .flatMap { - case partValue(key, value) => Some(key -> value) - case _ => None - }.toMap - - val currentValue = partValues.values.head.toInt - iter.map { pair => - val res = pair._2.asInstanceOf[SpecificMutableRow] - res.setInt(partitionKeyLocation, currentValue) - res + case partValue(key, value) => Some(key -> value) + case _ => None } + .toMap + + if (primitiveRow) { + iterator.map { pair => + // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. + val mutableRow = pair._2.asInstanceOf[SpecificMutableRow] + var i = 0 + mutableRow.update(partitionKeyLocation, partValues.values.head.toInt) + mutableRow + } + } else { + // Create a mutable row since we need to fill in values from partition columns. + val mutableRow = new GenericMutableRow(requestedSchema.toAttributes.size) + + iterator.map { pair => + // We are using CatalystGroupConverter and it returns a GenericRow. + // Since GenericRow is not mutable, we just cast it to a Row. + val row = pair._2.asInstanceOf[Row] + var i = 0 + while (i < row.size) { + mutableRow(i) = row(i) + i += 1 + } + mutableRow.update(partitionKeyLocation, partValues.values.head.toInt) + mutableRow + } } } } else { http://git-wip-us.apache.org/repos/asf/spark/blob/e080cc3e/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 06fe144..d788b70 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -31,7 +31,20 @@ import org.apache.spark.sql.hive.test.TestHive._ case class ParquetData(intField: Int, stringField: String) // The data that also includes the partitioning key case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) +case class StructContainer(intStructField: Int, stringStructField: String) +case class ParquetDataWithComplexTypes( + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) + +case class ParquetDataWithKeyAndComplexTypes( + p: Int, + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) /** * A suite to test the automatic conversion of metastore tables with parquet data to use the @@ -70,6 +83,38 @@ class ParquetMetastoreSuite extends ParquetTest { """) sql(s""" + create external table partitioned_parquet_with_complextypes + ( + intField INT, + stringField STRING, + structField STRUCT<intStructField: INT, stringStructField: STRING>, + arrayField ARRAY<INT> + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDirWithComplexTypes.getCanonicalPath}' + """) + + sql(s""" + create external table partitioned_parquet_with_key_and_complextypes + ( + intField INT, + stringField STRING, + structField STRUCT<intStructField: INT, stringStructField: STRING>, + arrayField ARRAY<INT> + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' + """) + + sql(s""" create external table normal_parquet ( intField INT, @@ -90,10 +135,24 @@ class ParquetMetastoreSuite extends ParquetTest { sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)") } + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)") + } + + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)") + } + setConf("spark.sql.hive.convertMetastoreParquet", "true") } override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS partitioned_parquet") + sql("DROP TABLE IF EXISTS partitioned_parquet_with_key") + sql("DROP TABLE IF EXISTS partitioned_parquet_with_complextypes") + sql("DROP TABLE IF EXISTS partitioned_parquet_with_key_and_complextypes") + sql("DROP TABLE IF EXISTS normal_parquet") + setConf("spark.sql.hive.convertMetastoreParquet", "false") } @@ -139,6 +198,22 @@ class ParquetSourceSuite extends ParquetTest { path '${new File(partitionedTableDir, "p=1").getCanonicalPath}' ) """) + + sql( s""" + CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' + ) + """) + + sql( s""" + CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithComplexTypes.getCanonicalPath}' + ) + """) } } @@ -147,7 +222,10 @@ class ParquetSourceSuite extends ParquetTest { */ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { var partitionedTableDir: File = null + var normalTableDir: File = null var partitionedTableDirWithKey: File = null + var partitionedTableDirWithKeyAndComplexTypes: File = null + var partitionedTableDirWithComplexTypes: File = null override def beforeAll(): Unit = { partitionedTableDir = File.createTempFile("parquettests", "sparksql") @@ -161,6 +239,15 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { .saveAsParquetFile(partDir.getCanonicalPath) } + normalTableDir = File.createTempFile("parquettests", "sparksql") + normalTableDir.delete() + normalTableDir.mkdir() + + sparkContext + .makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-1")) + .saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath) + partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql") partitionedTableDirWithKey.delete() partitionedTableDirWithKey.mkdir() @@ -171,9 +258,46 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { .map(i => ParquetDataWithKey(p, i, s"part-$p")) .saveAsParquetFile(partDir.getCanonicalPath) } + + partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithKeyAndComplexTypes.delete() + partitionedTableDirWithKeyAndComplexTypes.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetDataWithKeyAndComplexTypes( + p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)) + .saveAsParquetFile(partDir.getCanonicalPath) + } + + partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithComplexTypes.delete() + partitionedTableDirWithComplexTypes.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetDataWithComplexTypes( + i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)) + .saveAsParquetFile(partDir.getCanonicalPath) + } } - Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table => + override protected def afterAll(): Unit = { + //delete temporary files + partitionedTableDir.delete() + normalTableDir.delete() + partitionedTableDirWithKey.delete() + partitionedTableDirWithKeyAndComplexTypes.delete() + partitionedTableDirWithComplexTypes.delete() + } + + Seq( + "partitioned_parquet", + "partitioned_parquet_with_key", + "partitioned_parquet_with_complextypes", + "partitioned_parquet_with_key_and_complextypes").foreach { table => test(s"ordering of the partitioning columns $table") { checkAnswer( sql(s"SELECT p, stringField FROM $table WHERE p = 1"), @@ -186,6 +310,8 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { ) } + + test(s"project the partitioning column $table") { checkAnswer( sql(s"SELECT p, count(*) FROM $table group by p"), @@ -263,6 +389,29 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { } } + Seq( + "partitioned_parquet_with_key_and_complextypes", + "partitioned_parquet_with_complextypes").foreach { table => + test(s"SPARK-5775 read structure from $table") { + checkAnswer( + sql(s""" + SELECT + p, + structField.intStructField, + structField.stringStructField + FROM $table + WHERE p = 1"""), + (1 to 10).map(i => Row(1, i, f"${i}_string"))) + } + + // Re-enable this after SPARK-5508 is fixed + ignore(s"SPARK-5775 read array from $table") { + checkAnswer( + sql(s"SELECT arrayField, p FROM $table WHERE p = 1"), + (1 to 10).map(i => Row(1 to i, 1))) + } + } + test("non-part select(*)") { checkAnswer( sql("SELECT COUNT(*) FROM normal_parquet"), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
