Repository: spark Updated Branches: refs/heads/master 73b3089b8 -> 727cb25bc
[SPARK-3036][SPARK-3037][SQL] Add MapType/ArrayType containing null value support to Parquet. JIRA: - https://issues.apache.org/jira/browse/SPARK-3036 - https://issues.apache.org/jira/browse/SPARK-3037 Currently this uses the following Parquet schema for `MapType` when `valueContainsNull` is `true`: ``` message root { optional group a (MAP) { repeated group map (MAP_KEY_VALUE) { required int32 key; optional int32 value; } } } ``` for `ArrayType` when `containsNull` is `true`: ``` message root { optional group a (LIST) { repeated group bag { optional int32 array; } } } ``` We have to think about compatibilities with older version of Spark or Hive or others I mentioned in the JIRA issues. Notice: This PR is based on #1963 and #1889. Please check them first. /cc marmbrus, yhuai Author: Takuya UESHIN <[email protected]> Closes #2032 from ueshin/issues/SPARK-3036_3037 and squashes the following commits: 4e8e9e7 [Takuya UESHIN] Add ArrayType containing null value support to Parquet. 013c2ca [Takuya UESHIN] Add MapType containing null value support to Parquet. 62989de [Takuya UESHIN] Merge branch 'issues/SPARK-2969' into issues/SPARK-3036_3037 8e38b53 [Takuya UESHIN] Merge branch 'issues/SPARK-3063' into issues/SPARK-3036_3037 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/727cb25b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/727cb25b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/727cb25b Branch: refs/heads/master Commit: 727cb25bcc29481d6b744abef1ca091e64b5f91f Parents: 73b3089 Author: Takuya UESHIN <[email protected]> Authored: Tue Aug 26 18:28:41 2014 -0700 Committer: Michael Armbrust <[email protected]> Committed: Tue Aug 26 18:28:41 2014 -0700 ---------------------------------------------------------------------- .../spark/sql/parquet/ParquetConverter.scala | 83 ++++++++++++++++++++ .../spark/sql/parquet/ParquetTableSupport.scala | 54 ++++++++----- .../apache/spark/sql/parquet/ParquetTypes.scala | 54 +++++++++---- .../spark/sql/parquet/ParquetQuerySuite.scala | 16 +++- 4 files changed, 167 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/727cb25b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index ef4526e..9fd6aed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -58,6 +58,7 @@ private[sql] object CatalystConverter { // This is mostly Parquet convention (see, e.g., `ConversionPatterns`). // Note that "array" for the array elements is chosen by ParquetAvro. // Using a different value will result in Parquet silently dropping columns. + val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag" val ARRAY_ELEMENTS_SCHEMA_NAME = "array" val MAP_KEY_SCHEMA_NAME = "key" val MAP_VALUE_SCHEMA_NAME = "value" @@ -82,6 +83,9 @@ private[sql] object CatalystConverter { case ArrayType(elementType: DataType, false) => { new CatalystArrayConverter(elementType, fieldIndex, parent) } + case ArrayType(elementType: DataType, true) => { + new CatalystArrayContainsNullConverter(elementType, fieldIndex, parent) + } case StructType(fields: Seq[StructField]) => { new CatalystStructConverter(fields.toArray, fieldIndex, parent) } @@ -568,6 +572,85 @@ private[parquet] class CatalystNativeArrayConverter( } /** + * A `parquet.io.api.GroupConverter` that converts a single-element groups that + * match the characteristics of an array contains null (see + * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an + * [[org.apache.spark.sql.catalyst.types.ArrayType]]. + * + * @param elementType The type of the array elements (complex or primitive) + * @param index The position of this (array) field inside its parent converter + * @param parent The parent converter + * @param buffer A data buffer + */ +private[parquet] class CatalystArrayContainsNullConverter( + val elementType: DataType, + val index: Int, + protected[parquet] val parent: CatalystConverter, + protected[parquet] var buffer: Buffer[Any]) + extends CatalystConverter { + + def this(elementType: DataType, index: Int, parent: CatalystConverter) = + this( + elementType, + index, + parent, + new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE)) + + protected[parquet] val converter: Converter = new CatalystConverter { + + private var current: Any = null + + val converter = CatalystConverter.createConverter( + new CatalystConverter.FieldType( + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + elementType, + false), + fieldIndex = 0, + parent = this) + + override def getConverter(fieldIndex: Int): Converter = converter + + override def end(): Unit = parent.updateField(index, current) + + override def start(): Unit = { + current = null + } + + override protected[parquet] val size: Int = 1 + override protected[parquet] val index: Int = 0 + override protected[parquet] val parent = CatalystArrayContainsNullConverter.this + + override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { + current = value + } + + override protected[parquet] def clearBuffer(): Unit = {} + } + + override def getConverter(fieldIndex: Int): Converter = converter + + // arrays have only one (repeated) field, which is its elements + override val size = 1 + + override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { + buffer += value + } + + override protected[parquet] def clearBuffer(): Unit = { + buffer.clear() + } + + override def start(): Unit = {} + + override def end(): Unit = { + assert(parent != null) + // here we need to make sure to use ArrayScalaType + parent.updateField(index, buffer.toArray.toSeq) + clearBuffer() + } +} + +/** * This converter is for multi-element groups of primitive or complex types * that have repetition level optional or required (so struct fields). * http://git-wip-us.apache.org/repos/asf/spark/blob/727cb25b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 6a657c2..bdf0240 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -173,7 +173,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { private[parquet] def writeValue(schema: DataType, value: Any): Unit = { if (value != null) { schema match { - case t @ ArrayType(_, false) => writeArray( + case t @ ArrayType(_, _) => writeArray( t, value.asInstanceOf[CatalystConverter.ArrayScalaType[_]]) case t @ MapType(_, _, _) => writeMap( @@ -228,45 +228,57 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { } } - // TODO: support null values, see - // https://issues.apache.org/jira/browse/SPARK-1649 private[parquet] def writeArray( schema: ArrayType, array: CatalystConverter.ArrayScalaType[_]): Unit = { val elementType = schema.elementType writer.startGroup() if (array.size > 0) { - writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - var i = 0 - while(i < array.size) { - writeValue(elementType, array(i)) - i = i + 1 + if (schema.containsNull) { + writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0) + var i = 0 + while (i < array.size) { + writer.startGroup() + if (array(i) != null) { + writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) + writeValue(elementType, array(i)) + writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) + } + writer.endGroup() + i = i + 1 + } + writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0) + } else { + writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) + var i = 0 + while (i < array.size) { + writeValue(elementType, array(i)) + i = i + 1 + } + writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) } - writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) } writer.endGroup() } - // TODO: support null values, see - // https://issues.apache.org/jira/browse/SPARK-1649 private[parquet] def writeMap( schema: MapType, map: CatalystConverter.MapScalaType[_, _]): Unit = { writer.startGroup() if (map.size > 0) { writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0) - writer.startGroup() - writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) - for(key <- map.keys) { + for ((key, value) <- map) { + writer.startGroup() + writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) writeValue(schema.keyType, key) + writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) + if (value != null) { + writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) + writeValue(schema.valueType, value) + writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) + } + writer.endGroup() } - writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) - writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) - for(value <- map.values) { - writeValue(schema.valueType, value) - } - writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) - writer.endGroup() writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0) } writer.endGroup() http://git-wip-us.apache.org/repos/asf/spark/blob/727cb25b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index af8cd0a..1a52377 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -119,7 +119,13 @@ private[parquet] object ParquetTypesConverter extends Logging { case ParquetOriginalType.LIST => { // TODO: check enums! assert(groupType.getFieldCount == 1) val field = groupType.getFields.apply(0) - ArrayType(toDataType(field, isBinaryAsString), containsNull = false) + if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) { + val bag = field.asGroupType() + assert(bag.getFieldCount == 1) + ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true) + } else { + ArrayType(toDataType(field, isBinaryAsString), containsNull = false) + } } case ParquetOriginalType.MAP => { assert( @@ -129,28 +135,32 @@ private[parquet] object ParquetTypesConverter extends Logging { assert( keyValueGroup.getFieldCount == 2, "Parquet Map type malformatted: nested group should have 2 (key, value) fields!") - val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) + + val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString) - assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED) - // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true - // at here. - MapType(keyType, valueType) + MapType(keyType, valueType, + keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED) } case _ => { // Note: the order of these checks is important! if (correspondsToMap(groupType)) { // MapType val keyValueGroup = groupType.getFields.apply(0).asGroupType() - val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) + + val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString) - assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED) - // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true - // at here. - MapType(keyType, valueType) + MapType(keyType, valueType, + keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED) } else if (correspondsToArray(groupType)) { // ArrayType - val elementType = toDataType(groupType.getFields.apply(0), isBinaryAsString) - ArrayType(elementType, containsNull = false) + val field = groupType.getFields.apply(0) + if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) { + val bag = field.asGroupType() + assert(bag.getFieldCount == 1) + ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), containsNull = true) + } else { + ArrayType(toDataType(field, isBinaryAsString), containsNull = false) + } } else { // everything else: StructType val fields = groupType .getFields @@ -249,13 +259,27 @@ private[parquet] object ParquetTypesConverter extends Logging { inArray = true) ConversionPatterns.listType(repetition, name, parquetElementType) } + case ArrayType(elementType, true) => { + val parquetElementType = fromDataType( + elementType, + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + nullable = true, + inArray = false) + ConversionPatterns.listType( + repetition, + name, + new ParquetGroupType( + Repetition.REPEATED, + CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, + parquetElementType)) + } case StructType(structFields) => { val fields = structFields.map { field => fromDataType(field.dataType, field.name, field.nullable, inArray = false) } new ParquetGroupType(repetition, name, fields) } - case MapType(keyType, valueType, _) => { + case MapType(keyType, valueType, valueContainsNull) => { val parquetKeyType = fromDataType( keyType, @@ -266,7 +290,7 @@ private[parquet] object ParquetTypesConverter extends Logging { fromDataType( valueType, CatalystConverter.MAP_VALUE_SCHEMA_NAME, - nullable = false, + nullable = valueContainsNull, inArray = false) ConversionPatterns.mapType( repetition, http://git-wip-us.apache.org/repos/asf/spark/blob/727cb25b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 28f43b3..4219cc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -78,7 +78,9 @@ case class AllDataTypesWithNonPrimitiveType( booleanField: Boolean, binaryField: Array[Byte], array: Seq[Int], - map: Map[Int, String], + arrayContainsNull: Seq[Option[Int]], + map: Map[Int, Long], + mapValueContainsNull: Map[Int, Option[Long]], data: Data) class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { @@ -287,7 +289,11 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA .map(x => AllDataTypesWithNonPrimitiveType( s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0, (0 to x).map(_.toByte).toArray, - (0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), Nested(x, s"$x")))) + (0 until x), + (0 until x).map(Option(_).filter(_ % 3 == 0)), + (0 until x).map(i => i -> i.toLong).toMap, + (0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None), + Data((0 until x), Nested(x, s"$x")))) .saveAsParquetFile(tempDir) val result = parquetFile(tempDir).collect() range.foreach { @@ -302,8 +308,10 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA assert(result(i).getBoolean(7) === (i % 2 == 0)) assert(result(i)(8) === (0 to i).map(_.toByte).toArray) assert(result(i)(9) === (0 until i)) - assert(result(i)(10) === (0 until i).map(i => i -> s"$i").toMap) - assert(result(i)(11) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i"))))) + assert(result(i)(10) === (0 until i).map(i => if (i % 3 == 0) i else null)) + assert(result(i)(11) === (0 until i).map(i => i -> i.toLong).toMap) + assert(result(i)(12) === (0 until i).map(i => i -> i.toLong).toMap + (i -> null)) + assert(result(i)(13) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i"))))) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
