Repository: spark
Updated Branches:
refs/heads/branch-1.5 e2c6ef810 -> 90245f65c
[SPARK-10005] [SQL] Fixes schema merging for nested structs
In case of schema merging, we only handled first level fields when converting
Parquet groups to `InternalRow`s. Nested struct fields are not properly handled.
For example, the schema of a Parquet file to be read can be:
```
message individual {
required group f1 {
optional binary f11 (utf8);
}
}
```
while the global schema is:
```
message global {
required group f1 {
optional binary f11 (utf8);
optional int32 f12;
}
}
```
This PR fixes this issue by padding missing fields when creating actual
converters.
Author: Cheng Lian <[email protected]>
Closes #8228 from liancheng/spark-10005/nested-schema-merging.
(cherry picked from commit ae2370e72f93db8a28b262e8252c55fe1fc9873c)
Signed-off-by: Yin Huai <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90245f65
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90245f65
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90245f65
Branch: refs/heads/branch-1.5
Commit: 90245f65c94a40d3210207abaf6f136f5ce2861f
Parents: e2c6ef8
Author: Cheng Lian <[email protected]>
Authored: Sun Aug 16 10:17:58 2015 -0700
Committer: Yin Huai <[email protected]>
Committed: Sun Aug 16 10:18:08 2015 -0700
----------------------------------------------------------------------
.../parquet/CatalystReadSupport.scala | 19 ++++--
.../parquet/CatalystRowConverter.scala | 70 ++++++++++++++++++--
.../parquet/CatalystSchemaConverter.scala | 15 +----
.../datasources/parquet/ParquetQuerySuite.scala | 30 ++++++++-
4 files changed, 112 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 4049795..a4679bb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow]
with Logging {
+ // Called after `init()` when initializing Parquet record reader.
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
@@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends
ReadSupport[InternalRow] with
// available if the target file is written by Spark SQL.
.orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
}.map(StructType.fromString).getOrElse {
- logDebug("Catalyst schema not available, falling back to Parquet
schema")
+ logInfo("Catalyst schema not available, falling back to Parquet
schema")
toCatalyst.convert(parquetRequestedSchema)
}
- logDebug(s"Catalyst schema used to read Parquet files:
$catalystRequestedSchema")
+ logInfo {
+ s"""Going to read the following fields from the Parquet file:
+ |
+ |Parquet form:
+ |$parquetRequestedSchema
+ |
+ |Catalyst form:
+ |$catalystRequestedSchema
+ """.stripMargin
+ }
+
new CatalystRecordMaterializer(parquetRequestedSchema,
catalystRequestedSchema)
}
+ // Called before `prepareForRead()` when initializing Parquet record reader.
override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration
// If the target file was written by Spark SQL, we should be able to find
a serialized Catalyst
- // schema of this file from its the metadata.
+ // schema of this file from its metadata.
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
// Optional schema of requested columns, in the form of a string
serialized from a Catalyst
@@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends
ReadSupport[InternalRow] with
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
- logInfo(s"Going to read Parquet file with these requested columns:
$parquetRequestedSchema")
new ReadContext(parquetRequestedSchema, metadata)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index ab5a6dd..18c5b50 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -25,9 +25,10 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter,
PrimitiveConverter}
-import org.apache.parquet.schema.OriginalType.LIST
+import org.apache.parquet.schema.OriginalType.{LIST, INT_32, UTF8}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
import org.apache.parquet.schema.Type.Repetition
-import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -88,12 +89,54 @@ private[parquet] class CatalystPrimitiveConverter(val
updater: ParentContainerUp
}
/**
- * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark
SQL [[InternalRow]]s.
- * Since any Parquet record is also a struct, this converter can also be used
as root converter.
+ * A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst
[[InternalRow]]s.
+ * Since Catalyst `StructType` is also a Parquet record, this converter can be
used as root
+ * converter. Take the following Parquet type as an example:
+ * {{{
+ * message root {
+ * required int32 f1;
+ * optional group f2 {
+ * required double f21;
+ * optional binary f22 (utf8);
+ * }
+ * }
+ * }}}
+ * 5 converters will be created:
+ *
+ * - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which
contains:
+ * - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and
+ * - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`,
which contains:
+ * - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`,
and
+ * - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22`
*
* When used as a root converter, [[NoopUpdater]] should be used since root
converters don't have
* any "parent" container.
*
+ * @note Constructor argument [[parquetType]] refers to requested fields of
the actual schema of the
+ * Parquet file being read, while constructor argument [[catalystType]]
refers to requested
+ * fields of the global schema. The key difference is that, in case of
schema merging,
+ * [[parquetType]] can be a subset of [[catalystType]]. For example,
it's possible to have
+ * the following [[catalystType]]:
+ * {{{
+ * new StructType()
+ * .add("f1", IntegerType, nullable = false)
+ * .add("f2", StringType, nullable = true)
+ * .add("f3", new StructType()
+ * .add("f31", DoubleType, nullable = false)
+ * .add("f32", IntegerType, nullable = true)
+ * .add("f33", StringType, nullable = true), nullable = false)
+ * }}}
+ * and the following [[parquetType]] (`f2` and `f32` are missing):
+ * {{{
+ * message root {
+ * required int32 f1;
+ * required group f3 {
+ * required double f31;
+ * optional binary f33 (utf8);
+ * }
+ * }
+ * }}}
+ *
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record
type
* @param updater An updater which propagates converted field values to the
parent container
@@ -126,7 +169,24 @@ private[parquet] class CatalystRowConverter(
// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater]
= {
- parquetType.getFields.zip(catalystType).zipWithIndex.map {
+ // In case of schema merging, `parquetType` can be a subset of
`catalystType`. We need to pad
+ // those missing fields and create converters for them, although values of
these fields are
+ // always null.
+ val paddedParquetFields = {
+ val parquetFields = parquetType.getFields
+ val parquetFieldNames = parquetFields.map(_.getName).toSet
+ val missingFields = catalystType.filterNot(f =>
parquetFieldNames.contains(f.name))
+
+ // We don't need to worry about feature flag arguments like
`assumeBinaryIsString` when
+ // creating the schema converter here, since values of missing fields
are always null.
+ val toParquet = new CatalystSchemaConverter()
+
+ (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f
=>
+ catalystType.indexWhere(_.name == f.getName)
+ }
+ }
+
+ paddedParquetFields.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of
`currentRow`
newConverter(parquetFieldType, catalystField.dataType, new
RowUpdater(currentRow, ordinal))
http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 275646e..535f068 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -72,18 +72,9 @@ private[parquet] class CatalystSchemaConverter(
followParquetFormatSpec = conf.followParquetFormatSpec)
def this(conf: Configuration) = this(
- assumeBinaryIsString =
- conf.getBoolean(
- SQLConf.PARQUET_BINARY_AS_STRING.key,
- SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get),
- assumeInt96IsTimestamp =
- conf.getBoolean(
- SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
- SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get),
- followParquetFormatSpec =
- conf.getBoolean(
- SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key,
- SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get))
+ assumeBinaryIsString =
conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
+ assumeInt96IsTimestamp =
conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
+ followParquetFormatSpec =
conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean)
/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL
[[StructType]].
http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index e2f2a8c..b7b70c2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -21,7 +21,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.sql._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -201,4 +201,32 @@ class ParquetQuerySuite extends QueryTest with ParquetTest
with SharedSQLContext
assert(Decimal("67123.45") === Decimal(decimal))
}
}
+
+ test("SPARK-10005 Schema merging for nested struct") {
+ val sqlContext = _sqlContext
+ import sqlContext.implicits._
+
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ def append(df: DataFrame): Unit = {
+ df.write.mode(SaveMode.Append).parquet(path)
+ }
+
+ // Note that both the following two DataFrames contain a single struct
column with multiple
+ // nested fields.
+ append((1 to 2).map(i => Tuple1((i, i))).toDF())
+ append((1 to 2).map(i => Tuple1((i, i, i))).toDF())
+
+ withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
+ checkAnswer(
+ sqlContext.read.option("mergeSchema", "true").parquet(path),
+ Seq(
+ Row(Row(1, 1, null)),
+ Row(Row(2, 2, null)),
+ Row(Row(1, 1, 1)),
+ Row(Row(2, 2, 2))))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]