Repository: spark Updated Branches: refs/heads/master f3f4c87b3 -> e0fdd467e
[SPARK-6010] [SQL] Merging compatible Parquet schemas before computing splits `ReadContext.init` calls `InitContext.getMergedKeyValueMetadata`, which doesn't know how to merge conflicting user defined key-value metadata and throws exception. In our case, when dealing with different but compatible schemas, we have different Spark SQL schema JSON strings in different Parquet part-files, thus causes this problem. Reading similar Parquet files generated by Hive doesn't suffer from this issue. In this PR, we manually merge the schemas before passing it to `ReadContext` to avoid the exception. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4768) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes #4768 from liancheng/spark-6010 and squashes the following commits: 9002f0a [Cheng Lian] Fixes SPARK-6010 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0fdd467 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0fdd467 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0fdd467 Branch: refs/heads/master Commit: e0fdd467e277867d6bec5c6605cc1cabce70ac89 Parents: f3f4c87 Author: Cheng Lian <[email protected]> Authored: Wed Feb 25 15:15:22 2015 -0800 Committer: Michael Armbrust <[email protected]> Committed: Wed Feb 25 15:15:22 2015 -0800 ---------------------------------------------------------------------- .../sql/parquet/ParquetTableOperations.scala | 20 ++++++++++++++++++- .../apache/spark/sql/parquet/ParquetTest.scala | 5 +++++ .../ParquetPartitionDiscoverySuite.scala | 21 ++++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e0fdd467/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 6596645..4dc13b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -48,6 +48,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row, _} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.{Logging, SerializableWritable, TaskContext} /** @@ -459,13 +460,30 @@ private[parquet] class FilteringParquetRowInputFormat val getGlobalMetaData = classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]]) getGlobalMetaData.setAccessible(true) - val globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData] + var globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData] if (globalMetaData == null) { val splits = mutable.ArrayBuffer.empty[ParquetInputSplit] return splits } + Option(globalMetaData.getKeyValueMetaData.get(RowReadSupport.SPARK_METADATA_KEY)).foreach { + schemas => + val mergedSchema = schemas + .map(DataType.fromJson(_).asInstanceOf[StructType]) + .reduce(_ merge _) + .json + + val mergedMetadata = globalMetaData + .getKeyValueMetaData + .updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(mergedSchema))) + + globalMetaData = new GlobalMetaData( + globalMetaData.getSchema, + mergedMetadata, + globalMetaData.getCreatedBy) + } + val readContext = getReadSupport(configuration).init( new InitContext(configuration, globalMetaData.getKeyValueMetaData, http://git-wip-us.apache.org/repos/asf/spark/blob/e0fdd467/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala index 0fa2fe9..d6ea667 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala @@ -131,6 +131,11 @@ private[sql] trait ParquetTest { data.toDF().save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite) } + protected def makeParquetFile[T <: Product: ClassTag: TypeTag]( + df: DataFrame, path: File): Unit = { + df.save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite) + } + protected def makePartitionDir( basePath: File, defaultPartitionName: String, http://git-wip-us.apache.org/repos/asf/spark/blob/e0fdd467/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 3bf0116..adb3c93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -36,6 +36,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { override val sqlContext: SQLContext = TestSQLContext import sqlContext._ + import sqlContext.implicits._ val defaultPartitionName = "__NULL__" @@ -319,4 +320,24 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { } } } + + test("read partitioned table - merging compatible schemas") { + withTempDir { base => + makeParquetFile( + (1 to 10).map(i => Tuple1(i)).toDF("intField"), + makePartitionDir(base, defaultPartitionName, "pi" -> 1)) + + makeParquetFile( + (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"), + makePartitionDir(base, defaultPartitionName, "pi" -> 2)) + + load(base.getCanonicalPath, "org.apache.spark.sql.parquet").registerTempTable("t") + + withTempTable("t") { + checkAnswer( + sql("SELECT * FROM t"), + (1 to 10).map(i => Row(i, null, 1)) ++ (1 to 10).map(i => Row(i, i.toString, 2))) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
