Repository: spark Updated Branches: refs/heads/master 3e6d567a4 -> 865e7cc38
http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala new file mode 100644 index 0000000..e6ef634 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} +import org.apache.parquet.hadoop.api.ReadSupport.ReadContext +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema._ +import org.apache.parquet.schema.Type.Repetition + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ + +/** + * A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst + * [[InternalRow]]s. + * + * The API interface of [[ReadSupport]] is a little bit over complicated because of historical + * reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be + * instantiated and initialized twice on both driver side and executor side. The [[init()]] method + * is for driver side initialization, while [[prepareForRead()]] is for executor side. However, + * starting from parquet-mr 1.6.0, it's no longer the case, and [[ReadSupport]] is only instantiated + * and initialized on executor side. So, theoretically, now it's totally fine to combine these two + * methods into a single initialization method. The only reason (I could think of) to still have + * them here is for parquet-mr API backwards-compatibility. + * + * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] + * to [[prepareForRead()]], but use a private `var` for simplicity. + */ +private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with Logging { + private var catalystRequestedSchema: StructType = _ + + /** + * Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record + * readers. Responsible for figuring out Parquet requested schema used for column pruning. + */ + override def init(context: InitContext): ReadContext = { + catalystRequestedSchema = { + val conf = context.getConfiguration + val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA) + assert(schemaString != null, "Parquet requested schema not set.") + StructType.fromString(schemaString) + } + + val parquetRequestedSchema = + ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) + + new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) + } + + /** + * Called on executor side after [[init()]], before instantiating actual Parquet record readers. + * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet + * records to Catalyst [[InternalRow]]s. + */ + override def prepareForRead( + conf: Configuration, + keyValueMetaData: JMap[String, String], + fileSchema: MessageType, + readContext: ReadContext): RecordMaterializer[InternalRow] = { + log.debug(s"Preparing for read Parquet file with message type: $fileSchema") + val parquetRequestedSchema = readContext.getRequestedSchema + + logInfo { + s"""Going to read the following fields from the Parquet file: + | + |Parquet form: + |$parquetRequestedSchema + |Catalyst form: + |$catalystRequestedSchema + """.stripMargin + } + + new ParquetRecordMaterializer( + parquetRequestedSchema, + ParquetReadSupport.expandUDT(catalystRequestedSchema)) + } +} + +private[parquet] object ParquetReadSupport { + val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" + + val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" + + /** + * Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist + * in `catalystSchema`, and adding those only exist in `catalystSchema`. + */ + def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { + val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) + if (clippedParquetFields.isEmpty) { + ParquetSchemaConverter.EMPTY_MESSAGE + } else { + Types + .buildMessage() + .addFields(clippedParquetFields: _*) + .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + } + } + + private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { + catalystType match { + case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => + // Only clips array types with nested type as element type. + clipParquetListType(parquetType.asGroupType(), t.elementType) + + case t: MapType + if !isPrimitiveCatalystType(t.keyType) || + !isPrimitiveCatalystType(t.valueType) => + // Only clips map types with nested key type or value type + clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType) + + case t: StructType => + clipParquetGroup(parquetType.asGroupType(), t) + + case _ => + // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able + // to be mapped to desired user-space types. So UDTs shouldn't participate schema merging. + parquetType + } + } + + /** + * Whether a Catalyst [[DataType]] is primitive. Primitive [[DataType]] is not equivalent to + * [[AtomicType]]. For example, [[CalendarIntervalType]] is primitive, but it's not an + * [[AtomicType]]. + */ + private def isPrimitiveCatalystType(dataType: DataType): Boolean = { + dataType match { + case _: ArrayType | _: MapType | _: StructType => false + case _ => true + } + } + + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]]. The element type + * of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a + * [[StructType]]. + */ + private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = { + // Precondition of this method, should only be called for lists with nested element types. + assert(!isPrimitiveCatalystType(elementType)) + + // Unannotated repeated group should be interpreted as required list of required element, so + // list element type is just the group itself. Clip it. + if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { + clipParquetType(parquetList, elementType) + } else { + assert( + parquetList.getOriginalType == OriginalType.LIST, + "Invalid Parquet schema. " + + "Original type of annotated Parquet lists must be LIST: " + + parquetList.toString) + + assert( + parquetList.getFieldCount == 1 && parquetList.getType(0).isRepetition(Repetition.REPEATED), + "Invalid Parquet schema. " + + "LIST-annotated group should only have exactly one repeated field: " + + parquetList) + + // Precondition of this method, should only be called for lists with nested element types. + assert(!parquetList.getType(0).isPrimitive) + + val repeatedGroup = parquetList.getType(0).asGroupType() + + // If the repeated field is a group with multiple fields, or the repeated field is a group + // with one field and is named either "array" or uses the LIST-annotated group's name with + // "_tuple" appended then the repeated type is the element type and elements are required. + // Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the + // only field. + if ( + repeatedGroup.getFieldCount > 1 || + repeatedGroup.getName == "array" || + repeatedGroup.getName == parquetList.getName + "_tuple" + ) { + Types + .buildGroup(parquetList.getRepetition) + .as(OriginalType.LIST) + .addField(clipParquetType(repeatedGroup, elementType)) + .named(parquetList.getName) + } else { + // Otherwise, the repeated field's type is the element type with the repeated field's + // repetition. + Types + .buildGroup(parquetList.getRepetition) + .as(OriginalType.LIST) + .addField( + Types + .repeatedGroup() + .addField(clipParquetType(repeatedGroup.getType(0), elementType)) + .named(repeatedGroup.getName)) + .named(parquetList.getName) + } + } + } + + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. Either key type or + * value type of the [[MapType]] must be a nested type, namely an [[ArrayType]], a [[MapType]], or + * a [[StructType]]. + */ + private def clipParquetMapType( + parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = { + // Precondition of this method, only handles maps with nested key types or value types. + assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType)) + + val repeatedGroup = parquetMap.getType(0).asGroupType() + val parquetKeyType = repeatedGroup.getType(0) + val parquetValueType = repeatedGroup.getType(1) + + val clippedRepeatedGroup = + Types + .repeatedGroup() + .as(repeatedGroup.getOriginalType) + .addField(clipParquetType(parquetKeyType, keyType)) + .addField(clipParquetType(parquetValueType, valueType)) + .named(repeatedGroup.getName) + + Types + .buildGroup(parquetMap.getRepetition) + .as(parquetMap.getOriginalType) + .addField(clippedRepeatedGroup) + .named(parquetMap.getName) + } + + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]]. + * + * @return A clipped [[GroupType]], which has at least one field. + * @note Parquet doesn't allow creating empty [[GroupType]] instances except for empty + * [[MessageType]]. Because it's legal to construct an empty requested schema for column + * pruning. + */ + private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = { + val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType) + Types + .buildGroup(parquetRecord.getRepetition) + .as(parquetRecord.getOriginalType) + .addFields(clippedParquetFields: _*) + .named(parquetRecord.getName) + } + + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]]. + * + * @return A list of clipped [[GroupType]] fields, which can be empty. + */ + private def clipParquetGroupFields( + parquetRecord: GroupType, structType: StructType): Seq[Type] = { + val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false) + structType.map { f => + parquetFieldMap + .get(f.name) + .map(clipParquetType(_, f.dataType)) + .getOrElse(toParquet.convertField(f)) + } + } + + def expandUDT(schema: StructType): StructType = { + def expand(dataType: DataType): DataType = { + dataType match { + case t: ArrayType => + t.copy(elementType = expand(t.elementType)) + + case t: MapType => + t.copy( + keyType = expand(t.keyType), + valueType = expand(t.valueType)) + + case t: StructType => + val expandedFields = t.fields.map(f => f.copy(dataType = expand(f.dataType))) + t.copy(fields = expandedFields) + + case t: UserDefinedType[_] => + t.sqlType + + case t => + t + } + } + + expand(schema).asInstanceOf[StructType] + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala new file mode 100644 index 0000000..0818d80 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} +import org.apache.parquet.schema.MessageType + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.StructType + +/** + * A [[RecordMaterializer]] for Catalyst rows. + * + * @param parquetSchema Parquet schema of the records to be read + * @param catalystSchema Catalyst schema of the rows to be constructed + */ +private[parquet] class ParquetRecordMaterializer( + parquetSchema: MessageType, catalystSchema: StructType) + extends RecordMaterializer[InternalRow] { + + private val rootConverter = new ParquetRowConverter(parquetSchema, catalystSchema, NoopUpdater) + + override def getCurrentRecord: InternalRow = rootConverter.currentRecord + + override def getRootConverter: GroupConverter = rootConverter +} http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala new file mode 100644 index 0000000..9dad596 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -0,0 +1,672 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.math.{BigDecimal, BigInteger} +import java.nio.ByteOrder + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.parquet.column.Dictionary +import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} +import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type} +import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8} +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some + * corresponding parent container. For example, a converter for a `StructType` field may set + * converted values to a [[MutableRow]]; or a converter for array elements may append converted + * values to an [[ArrayBuffer]]. + */ +private[parquet] trait ParentContainerUpdater { + /** Called before a record field is being converted */ + def start(): Unit = () + + /** Called after a record field is being converted */ + def end(): Unit = () + + def set(value: Any): Unit = () + def setBoolean(value: Boolean): Unit = set(value) + def setByte(value: Byte): Unit = set(value) + def setShort(value: Short): Unit = set(value) + def setInt(value: Int): Unit = set(value) + def setLong(value: Long): Unit = set(value) + def setFloat(value: Float): Unit = set(value) + def setDouble(value: Double): Unit = set(value) +} + +/** A no-op updater used for root converter (who doesn't have a parent). */ +private[parquet] object NoopUpdater extends ParentContainerUpdater + +private[parquet] trait HasParentContainerUpdater { + def updater: ParentContainerUpdater +} + +/** + * A convenient converter class for Parquet group types with a [[HasParentContainerUpdater]]. + */ +private[parquet] abstract class ParquetGroupConverter(val updater: ParentContainerUpdater) + extends GroupConverter with HasParentContainerUpdater + +/** + * Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types + * are handled by this converter. Parquet primitive types are only a subset of those of Spark + * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet. + */ +private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpdater) + extends PrimitiveConverter with HasParentContainerUpdater { + + override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) + override def addInt(value: Int): Unit = updater.setInt(value) + override def addLong(value: Long): Unit = updater.setLong(value) + override def addFloat(value: Float): Unit = updater.setFloat(value) + override def addDouble(value: Double): Unit = updater.setDouble(value) + override def addBinary(value: Binary): Unit = updater.set(value.getBytes) +} + +/** + * A [[ParquetRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s. + * Since Catalyst `StructType` is also a Parquet record, this converter can be used as root + * converter. Take the following Parquet type as an example: + * {{{ + * message root { + * required int32 f1; + * optional group f2 { + * required double f21; + * optional binary f22 (utf8); + * } + * } + * }}} + * 5 converters will be created: + * + * - a root [[ParquetRowConverter]] for [[MessageType]] `root`, which contains: + * - a [[ParquetPrimitiveConverter]] for required [[INT_32]] field `f1`, and + * - a nested [[ParquetRowConverter]] for optional [[GroupType]] `f2`, which contains: + * - a [[ParquetPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and + * - a [[ParquetStringConverter]] for optional [[UTF8]] string field `f22` + * + * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have + * any "parent" container. + * + * @param parquetType Parquet schema of Parquet records + * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined + * types should have been expanded. + * @param updater An updater which propagates converted field values to the parent container + */ +private[parquet] class ParquetRowConverter( + parquetType: GroupType, + catalystType: StructType, + updater: ParentContainerUpdater) + extends ParquetGroupConverter(updater) with Logging { + + assert( + parquetType.getFieldCount == catalystType.length, + s"""Field counts of the Parquet schema and the Catalyst schema don't match: + | + |Parquet schema: + |$parquetType + |Catalyst schema: + |${catalystType.prettyJson} + """.stripMargin) + + assert( + !catalystType.existsRecursively(_.isInstanceOf[UserDefinedType[_]]), + s"""User-defined types in Catalyst schema should have already been expanded: + |${catalystType.prettyJson} + """.stripMargin) + + logDebug( + s"""Building row converter for the following schema: + | + |Parquet form: + |$parquetType + |Catalyst form: + |${catalystType.prettyJson} + """.stripMargin) + + /** + * Updater used together with field converters within a [[ParquetRowConverter]]. It propagates + * converted filed values to the `ordinal`-th cell in `currentRow`. + */ + private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { + override def set(value: Any): Unit = row(ordinal) = value + override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) + override def setByte(value: Byte): Unit = row.setByte(ordinal, value) + override def setShort(value: Short): Unit = row.setShort(ordinal, value) + override def setInt(value: Int): Unit = row.setInt(ordinal, value) + override def setLong(value: Long): Unit = row.setLong(ordinal, value) + override def setDouble(value: Double): Unit = row.setDouble(ordinal, value) + override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) + } + + private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType)) + + private val unsafeProjection = UnsafeProjection.create(catalystType) + + /** + * The [[UnsafeRow]] converted from an entire Parquet record. + */ + def currentRecord: UnsafeRow = unsafeProjection(currentRow) + + // Converters for each field. + private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { + parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { + case ((parquetFieldType, catalystField), ordinal) => + // Converted field value should be set to the `ordinal`-th cell of `currentRow` + newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) + }.toArray + } + + override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) + + override def end(): Unit = { + var i = 0 + while (i < currentRow.numFields) { + fieldConverters(i).updater.end() + i += 1 + } + updater.set(currentRow) + } + + override def start(): Unit = { + var i = 0 + while (i < currentRow.numFields) { + fieldConverters(i).updater.start() + currentRow.setNullAt(i) + i += 1 + } + } + + /** + * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type + * `catalystType`. Converted values are handled by `updater`. + */ + private def newConverter( + parquetType: Type, + catalystType: DataType, + updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = { + + catalystType match { + case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => + new ParquetPrimitiveConverter(updater) + + case ByteType => + new ParquetPrimitiveConverter(updater) { + override def addInt(value: Int): Unit = + updater.setByte(value.asInstanceOf[ByteType#InternalType]) + } + + case ShortType => + new ParquetPrimitiveConverter(updater) { + override def addInt(value: Int): Unit = + updater.setShort(value.asInstanceOf[ShortType#InternalType]) + } + + // For INT32 backed decimals + case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 => + new ParquetIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater) + + // For INT64 backed decimals + case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 => + new ParquetLongDictionaryAwareDecimalConverter(t.precision, t.scale, updater) + + // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals + case t: DecimalType + if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY || + parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY => + new ParquetBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, updater) + + case t: DecimalType => + throw new RuntimeException( + s"Unable to create Parquet converter for decimal type ${t.json} whose Parquet type is " + + s"$parquetType. Parquet DECIMAL type can only be backed by INT32, INT64, " + + "FIXED_LEN_BYTE_ARRAY, or BINARY.") + + case StringType => + new ParquetStringConverter(updater) + + case TimestampType => + // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. + new ParquetPrimitiveConverter(updater) { + // Converts nanosecond timestamps stored as INT96 + override def addBinary(value: Binary): Unit = { + assert( + value.length() == 12, + "Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " + + s"but got a ${value.length()}-byte binary.") + + val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) + val timeOfDayNanos = buf.getLong + val julianDay = buf.getInt + updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)) + } + } + + case DateType => + new ParquetPrimitiveConverter(updater) { + override def addInt(value: Int): Unit = { + // DateType is not specialized in `SpecificMutableRow`, have to box it here. + updater.set(value.asInstanceOf[DateType#InternalType]) + } + } + + // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor + // annotated by `LIST` or `MAP` should be interpreted as a required list of required + // elements where the element type is the type of the field. + case t: ArrayType if parquetType.getOriginalType != LIST => + if (parquetType.isPrimitive) { + new RepeatedPrimitiveConverter(parquetType, t.elementType, updater) + } else { + new RepeatedGroupConverter(parquetType, t.elementType, updater) + } + + case t: ArrayType => + new ParquetArrayConverter(parquetType.asGroupType(), t, updater) + + case t: MapType => + new ParquetMapConverter(parquetType.asGroupType(), t, updater) + + case t: StructType => + new ParquetRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater { + override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) + }) + + case t => + throw new RuntimeException( + s"Unable to create Parquet converter for data type ${t.json} " + + s"whose Parquet type is $parquetType") + } + } + + /** + * Parquet converter for strings. A dictionary is used to minimize string decoding cost. + */ + private final class ParquetStringConverter(updater: ParentContainerUpdater) + extends ParquetPrimitiveConverter(updater) { + + private var expandedDictionary: Array[UTF8String] = null + + override def hasDictionarySupport: Boolean = true + + override def setDictionary(dictionary: Dictionary): Unit = { + this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i => + UTF8String.fromBytes(dictionary.decodeToBinary(i).getBytes) + } + } + + override def addValueFromDictionary(dictionaryId: Int): Unit = { + updater.set(expandedDictionary(dictionaryId)) + } + + override def addBinary(value: Binary): Unit = { + // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we + // are using `Binary.toByteBuffer.array()` to steal the underlying byte array without copying + // it. + val buffer = value.toByteBuffer + val offset = buffer.arrayOffset() + buffer.position() + val numBytes = buffer.remaining() + updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes)) + } + } + + /** + * Parquet converter for fixed-precision decimals. + */ + private abstract class ParquetDecimalConverter( + precision: Int, scale: Int, updater: ParentContainerUpdater) + extends ParquetPrimitiveConverter(updater) { + + protected var expandedDictionary: Array[Decimal] = _ + + override def hasDictionarySupport: Boolean = true + + override def addValueFromDictionary(dictionaryId: Int): Unit = { + updater.set(expandedDictionary(dictionaryId)) + } + + // Converts decimals stored as INT32 + override def addInt(value: Int): Unit = { + addLong(value: Long) + } + + // Converts decimals stored as INT64 + override def addLong(value: Long): Unit = { + updater.set(decimalFromLong(value)) + } + + // Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY + override def addBinary(value: Binary): Unit = { + updater.set(decimalFromBinary(value)) + } + + protected def decimalFromLong(value: Long): Decimal = { + Decimal(value, precision, scale) + } + + protected def decimalFromBinary(value: Binary): Decimal = { + if (precision <= Decimal.MAX_LONG_DIGITS) { + // Constructs a `Decimal` with an unscaled `Long` value if possible. + val unscaled = ParquetRowConverter.binaryToUnscaledLong(value) + Decimal(unscaled, precision, scale) + } else { + // Otherwise, resorts to an unscaled `BigInteger` instead. + Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale) + } + } + } + + private class ParquetIntDictionaryAwareDecimalConverter( + precision: Int, scale: Int, updater: ParentContainerUpdater) + extends ParquetDecimalConverter(precision, scale, updater) { + + override def setDictionary(dictionary: Dictionary): Unit = { + this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id => + decimalFromLong(dictionary.decodeToInt(id).toLong) + } + } + } + + private class ParquetLongDictionaryAwareDecimalConverter( + precision: Int, scale: Int, updater: ParentContainerUpdater) + extends ParquetDecimalConverter(precision, scale, updater) { + + override def setDictionary(dictionary: Dictionary): Unit = { + this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id => + decimalFromLong(dictionary.decodeToLong(id)) + } + } + } + + private class ParquetBinaryDictionaryAwareDecimalConverter( + precision: Int, scale: Int, updater: ParentContainerUpdater) + extends ParquetDecimalConverter(precision, scale, updater) { + + override def setDictionary(dictionary: Dictionary): Unit = { + this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id => + decimalFromBinary(dictionary.decodeToBinary(id)) + } + } + } + + /** + * Parquet converter for arrays. Spark SQL arrays are represented as Parquet lists. Standard + * Parquet lists are represented as a 3-level group annotated by `LIST`: + * {{{ + * <list-repetition> group <name> (LIST) { <-- parquetSchema points here + * repeated group list { + * <element-repetition> <element-type> element; + * } + * } + * }}} + * The `parquetSchema` constructor argument points to the outermost group. + * + * However, before this representation is standardized, some Parquet libraries/tools also use some + * non-standard formats to represent list-like structures. Backwards-compatibility rules for + * handling these cases are described in Parquet format spec. + * + * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists + */ + private final class ParquetArrayConverter( + parquetSchema: GroupType, + catalystSchema: ArrayType, + updater: ParentContainerUpdater) + extends ParquetGroupConverter(updater) { + + private var currentArray: ArrayBuffer[Any] = _ + + private val elementConverter: Converter = { + val repeatedType = parquetSchema.getType(0) + val elementType = catalystSchema.elementType + val parentName = parquetSchema.getName + + if (isElementType(repeatedType, elementType, parentName)) { + newConverter(repeatedType, elementType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentArray += value + }) + } else { + new ElementConverter(repeatedType.asGroupType().getType(0), elementType) + } + } + + override def getConverter(fieldIndex: Int): Converter = elementConverter + + override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray)) + + // NOTE: We can't reuse the mutable `ArrayBuffer` here and must instantiate a new buffer for the + // next value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored + // in row cells. + override def start(): Unit = currentArray = ArrayBuffer.empty[Any] + + // scalastyle:off + /** + * Returns whether the given type is the element type of a list or is a syntactic group with + * one field that is the element type. This is determined by checking whether the type can be + * a syntactic group and by checking whether a potential syntactic group matches the expected + * schema. + * {{{ + * <list-repetition> group <name> (LIST) { + * repeated group list { <-- repeatedType points here + * <element-repetition> <element-type> element; + * } + * } + * }}} + * In short, here we handle Parquet list backwards-compatibility rules on the read path. This + * method is based on `AvroIndexedRecordConverter.isElementType`. + * + * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules + */ + // scalastyle:on + private def isElementType( + parquetRepeatedType: Type, catalystElementType: DataType, parentName: String): Boolean = { + (parquetRepeatedType, catalystElementType) match { + case (t: PrimitiveType, _) => true + case (t: GroupType, _) if t.getFieldCount > 1 => true + case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => true + case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parentName + "_tuple" => true + case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true + case _ => false + } + } + + /** Array element converter */ + private final class ElementConverter(parquetType: Type, catalystType: DataType) + extends GroupConverter { + + private var currentElement: Any = _ + + private val converter = newConverter(parquetType, catalystType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentElement = value + }) + + override def getConverter(fieldIndex: Int): Converter = converter + + override def end(): Unit = currentArray += currentElement + + override def start(): Unit = currentElement = null + } + } + + /** Parquet converter for maps */ + private final class ParquetMapConverter( + parquetType: GroupType, + catalystType: MapType, + updater: ParentContainerUpdater) + extends ParquetGroupConverter(updater) { + + private var currentKeys: ArrayBuffer[Any] = _ + private var currentValues: ArrayBuffer[Any] = _ + + private val keyValueConverter = { + val repeatedType = parquetType.getType(0).asGroupType() + new KeyValueConverter( + repeatedType.getType(0), + repeatedType.getType(1), + catalystType.keyType, + catalystType.valueType) + } + + override def getConverter(fieldIndex: Int): Converter = keyValueConverter + + override def end(): Unit = + updater.set(ArrayBasedMapData(currentKeys.toArray, currentValues.toArray)) + + // NOTE: We can't reuse the mutable Map here and must instantiate a new `Map` for the next + // value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored in row + // cells. + override def start(): Unit = { + currentKeys = ArrayBuffer.empty[Any] + currentValues = ArrayBuffer.empty[Any] + } + + /** Parquet converter for key-value pairs within the map. */ + private final class KeyValueConverter( + parquetKeyType: Type, + parquetValueType: Type, + catalystKeyType: DataType, + catalystValueType: DataType) + extends GroupConverter { + + private var currentKey: Any = _ + + private var currentValue: Any = _ + + private val converters = Array( + // Converter for keys + newConverter(parquetKeyType, catalystKeyType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentKey = value + }), + + // Converter for values + newConverter(parquetValueType, catalystValueType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentValue = value + })) + + override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) + + override def end(): Unit = { + currentKeys += currentKey + currentValues += currentValue + } + + override def start(): Unit = { + currentKey = null + currentValue = null + } + } + } + + private trait RepeatedConverter { + private var currentArray: ArrayBuffer[Any] = _ + + protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater { + override def start(): Unit = currentArray = ArrayBuffer.empty[Any] + override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray)) + override def set(value: Any): Unit = currentArray += value + } + } + + /** + * A primitive converter for converting unannotated repeated primitive values to required arrays + * of required primitives values. + */ + private final class RepeatedPrimitiveConverter( + parquetType: Type, + catalystType: DataType, + parentUpdater: ParentContainerUpdater) + extends PrimitiveConverter with RepeatedConverter with HasParentContainerUpdater { + + val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater) + + private val elementConverter: PrimitiveConverter = + newConverter(parquetType, catalystType, updater).asPrimitiveConverter() + + override def addBoolean(value: Boolean): Unit = elementConverter.addBoolean(value) + override def addInt(value: Int): Unit = elementConverter.addInt(value) + override def addLong(value: Long): Unit = elementConverter.addLong(value) + override def addFloat(value: Float): Unit = elementConverter.addFloat(value) + override def addDouble(value: Double): Unit = elementConverter.addDouble(value) + override def addBinary(value: Binary): Unit = elementConverter.addBinary(value) + + override def setDictionary(dict: Dictionary): Unit = elementConverter.setDictionary(dict) + override def hasDictionarySupport: Boolean = elementConverter.hasDictionarySupport + override def addValueFromDictionary(id: Int): Unit = elementConverter.addValueFromDictionary(id) + } + + /** + * A group converter for converting unannotated repeated group values to required arrays of + * required struct values. + */ + private final class RepeatedGroupConverter( + parquetType: Type, + catalystType: DataType, + parentUpdater: ParentContainerUpdater) + extends GroupConverter with HasParentContainerUpdater with RepeatedConverter { + + val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater) + + private val elementConverter: GroupConverter = + newConverter(parquetType, catalystType, updater).asGroupConverter() + + override def getConverter(field: Int): Converter = elementConverter.getConverter(field) + override def end(): Unit = elementConverter.end() + override def start(): Unit = elementConverter.start() + } +} + +private[parquet] object ParquetRowConverter { + def binaryToUnscaledLong(binary: Binary): Long = { + // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here + // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without + // copying it. + val buffer = binary.toByteBuffer + val bytes = buffer.array() + val start = buffer.arrayOffset() + buffer.position() + val end = buffer.arrayOffset() + buffer.limit() + + var unscaled = 0L + var i = start + + while (i < end) { + unscaled = (unscaled << 8) | (bytes(i) & 0xff) + i += 1 + } + + val bits = 8 * (end - start) + unscaled = (unscaled << (64 - bits)) >> (64 - bits) + unscaled + } + + def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = { + assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" + + s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.") + val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) + val timeOfDayNanos = buffer.getLong + val julianDay = buffer.getInt + DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala new file mode 100644 index 0000000..bcf535d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ +import org.apache.parquet.schema.Type.Repetition._ + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.maxPrecisionForBytes +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +/** + * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] and + * vice versa. + * + * Parquet format backwards-compatibility rules are respected when converting Parquet + * [[MessageType]] schemas. + * + * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md + * @constructor + * @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL + * [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL + * [[StructType]]. This argument only affects Parquet read path. + * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL + * [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL + * [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which + * has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS` + * described in Parquet format spec. This argument only affects Parquet read path. + * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4 + * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]]. + * When set to false, use standard format defined in parquet-format spec. This argument only + * affects Parquet write path. + */ +private[parquet] class ParquetSchemaConverter( + assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, + assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, + writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) { + + def this(conf: SQLConf) = this( + assumeBinaryIsString = conf.isParquetBinaryAsString, + assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, + writeLegacyParquetFormat = conf.writeLegacyParquetFormat) + + def this(conf: Configuration) = this( + assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, + assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, + writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean) + + /** + * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. + */ + def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType()) + + private def convert(parquetSchema: GroupType): StructType = { + val fields = parquetSchema.getFields.asScala.map { field => + field.getRepetition match { + case OPTIONAL => + StructField(field.getName, convertField(field), nullable = true) + + case REQUIRED => + StructField(field.getName, convertField(field), nullable = false) + + case REPEATED => + // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor + // annotated by `LIST` or `MAP` should be interpreted as a required list of required + // elements where the element type is the type of the field. + val arrayType = ArrayType(convertField(field), containsNull = false) + StructField(field.getName, arrayType, nullable = false) + } + } + + StructType(fields) + } + + /** + * Converts a Parquet [[Type]] to a Spark SQL [[DataType]]. + */ + def convertField(parquetType: Type): DataType = parquetType match { + case t: PrimitiveType => convertPrimitiveField(t) + case t: GroupType => convertGroupField(t.asGroupType()) + } + + private def convertPrimitiveField(field: PrimitiveType): DataType = { + val typeName = field.getPrimitiveTypeName + val originalType = field.getOriginalType + + def typeString = + if (originalType == null) s"$typeName" else s"$typeName ($originalType)" + + def typeNotSupported() = + throw new AnalysisException(s"Parquet type not supported: $typeString") + + def typeNotImplemented() = + throw new AnalysisException(s"Parquet type not yet supported: $typeString") + + def illegalType() = + throw new AnalysisException(s"Illegal Parquet type: $typeString") + + // When maxPrecision = -1, we skip precision range check, and always respect the precision + // specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored + // as binaries with variable lengths. + def makeDecimalType(maxPrecision: Int = -1): DecimalType = { + val precision = field.getDecimalMetadata.getPrecision + val scale = field.getDecimalMetadata.getScale + + ParquetSchemaConverter.checkConversionRequirement( + maxPrecision == -1 || 1 <= precision && precision <= maxPrecision, + s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)") + + DecimalType(precision, scale) + } + + typeName match { + case BOOLEAN => BooleanType + + case FLOAT => FloatType + + case DOUBLE => DoubleType + + case INT32 => + originalType match { + case INT_8 => ByteType + case INT_16 => ShortType + case INT_32 | null => IntegerType + case DATE => DateType + case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS) + case UINT_8 => typeNotSupported() + case UINT_16 => typeNotSupported() + case UINT_32 => typeNotSupported() + case TIME_MILLIS => typeNotImplemented() + case _ => illegalType() + } + + case INT64 => + originalType match { + case INT_64 | null => LongType + case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS) + case UINT_64 => typeNotSupported() + case TIMESTAMP_MILLIS => typeNotImplemented() + case _ => illegalType() + } + + case INT96 => + ParquetSchemaConverter.checkConversionRequirement( + assumeInt96IsTimestamp, + "INT96 is not supported unless it's interpreted as timestamp. " + + s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") + TimestampType + + case BINARY => + originalType match { + case UTF8 | ENUM | JSON => StringType + case null if assumeBinaryIsString => StringType + case null => BinaryType + case BSON => BinaryType + case DECIMAL => makeDecimalType() + case _ => illegalType() + } + + case FIXED_LEN_BYTE_ARRAY => + originalType match { + case DECIMAL => makeDecimalType(maxPrecisionForBytes(field.getTypeLength)) + case INTERVAL => typeNotImplemented() + case _ => illegalType() + } + + case _ => illegalType() + } + } + + private def convertGroupField(field: GroupType): DataType = { + Option(field.getOriginalType).fold(convert(field): DataType) { + // A Parquet list is represented as a 3-level structure: + // + // <list-repetition> group <name> (LIST) { + // repeated group list { + // <element-repetition> <element-type> element; + // } + // } + // + // However, according to the most recent Parquet format spec (not released yet up until + // writing), some 2-level structures are also recognized for backwards-compatibility. Thus, + // we need to check whether the 2nd level or the 3rd level refers to list element type. + // + // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists + case LIST => + ParquetSchemaConverter.checkConversionRequirement( + field.getFieldCount == 1, s"Invalid list type $field") + + val repeatedType = field.getType(0) + ParquetSchemaConverter.checkConversionRequirement( + repeatedType.isRepetition(REPEATED), s"Invalid list type $field") + + if (isElementType(repeatedType, field.getName)) { + ArrayType(convertField(repeatedType), containsNull = false) + } else { + val elementType = repeatedType.asGroupType().getType(0) + val optional = elementType.isRepetition(OPTIONAL) + ArrayType(convertField(elementType), containsNull = optional) + } + + // scalastyle:off + // `MAP_KEY_VALUE` is for backwards-compatibility + // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1 + // scalastyle:on + case MAP | MAP_KEY_VALUE => + ParquetSchemaConverter.checkConversionRequirement( + field.getFieldCount == 1 && !field.getType(0).isPrimitive, + s"Invalid map type: $field") + + val keyValueType = field.getType(0).asGroupType() + ParquetSchemaConverter.checkConversionRequirement( + keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2, + s"Invalid map type: $field") + + val keyType = keyValueType.getType(0) + ParquetSchemaConverter.checkConversionRequirement( + keyType.isPrimitive, + s"Map key type is expected to be a primitive type, but found: $keyType") + + val valueType = keyValueType.getType(1) + val valueOptional = valueType.isRepetition(OPTIONAL) + MapType( + convertField(keyType), + convertField(valueType), + valueContainsNull = valueOptional) + + case _ => + throw new AnalysisException(s"Unrecognized Parquet type: $field") + } + } + + // scalastyle:off + // Here we implement Parquet LIST backwards-compatibility rules. + // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules + // scalastyle:on + private def isElementType(repeatedType: Type, parentName: String): Boolean = { + { + // For legacy 2-level list types with primitive element type, e.g.: + // + // // List<Integer> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated int32 element; + // } + // + repeatedType.isPrimitive + } || { + // For legacy 2-level list types whose element type is a group type with 2 or more fields, + // e.g.: + // + // // List<Tuple<String, Integer>> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated group element { + // required binary str (UTF8); + // required int32 num; + // }; + // } + // + repeatedType.asGroupType().getFieldCount > 1 + } || { + // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.: + // + // // List<OneTuple<String>> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated group array { + // required binary str (UTF8); + // }; + // } + // + repeatedType.getName == "array" + } || { + // For Parquet data generated by parquet-thrift, e.g.: + // + // // List<OneTuple<String>> (nullable list, non-null elements) + // optional group my_list (LIST) { + // repeated group my_list_tuple { + // required binary str (UTF8); + // }; + // } + // + repeatedType.getName == s"${parentName}_tuple" + } + } + + /** + * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]]. + */ + def convert(catalystSchema: StructType): MessageType = { + Types + .buildMessage() + .addFields(catalystSchema.map(convertField): _*) + .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + } + + /** + * Converts a Spark SQL [[StructField]] to a Parquet [[Type]]. + */ + def convertField(field: StructField): Type = { + convertField(field, if (field.nullable) OPTIONAL else REQUIRED) + } + + private def convertField(field: StructField, repetition: Type.Repetition): Type = { + ParquetSchemaConverter.checkFieldName(field.name) + + field.dataType match { + // =================== + // Simple atomic types + // =================== + + case BooleanType => + Types.primitive(BOOLEAN, repetition).named(field.name) + + case ByteType => + Types.primitive(INT32, repetition).as(INT_8).named(field.name) + + case ShortType => + Types.primitive(INT32, repetition).as(INT_16).named(field.name) + + case IntegerType => + Types.primitive(INT32, repetition).named(field.name) + + case LongType => + Types.primitive(INT64, repetition).named(field.name) + + case FloatType => + Types.primitive(FLOAT, repetition).named(field.name) + + case DoubleType => + Types.primitive(DOUBLE, repetition).named(field.name) + + case StringType => + Types.primitive(BINARY, repetition).as(UTF8).named(field.name) + + case DateType => + Types.primitive(INT32, repetition).as(DATE).named(field.name) + + // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec. + // + // As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond + // timestamp in Impala for some historical reasons. It's not recommended to be used for any + // other types and will probably be deprecated in some future version of parquet-format spec. + // That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and + // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`. + // + // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting + // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store + // a timestamp into a `Long`. This design decision is subject to change though, for example, + // we may resort to microsecond precision in the future. + // + // For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's + // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using) + // hasn't implemented `TIMESTAMP_MICROS` yet. + // + // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that. + case TimestampType => + Types.primitive(INT96, repetition).named(field.name) + + case BinaryType => + Types.primitive(BINARY, repetition).named(field.name) + + // ====================== + // Decimals (legacy mode) + // ====================== + + // Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and + // always store decimals in fixed-length byte arrays. To keep compatibility with these older + // versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated + // by `DECIMAL`. + case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat => + Types + .primitive(FIXED_LEN_BYTE_ARRAY, repetition) + .as(DECIMAL) + .precision(precision) + .scale(scale) + .length(ParquetSchemaConverter.minBytesForPrecision(precision)) + .named(field.name) + + // ======================== + // Decimals (standard mode) + // ======================== + + // Uses INT32 for 1 <= precision <= 9 + case DecimalType.Fixed(precision, scale) + if precision <= Decimal.MAX_INT_DIGITS && !writeLegacyParquetFormat => + Types + .primitive(INT32, repetition) + .as(DECIMAL) + .precision(precision) + .scale(scale) + .named(field.name) + + // Uses INT64 for 1 <= precision <= 18 + case DecimalType.Fixed(precision, scale) + if precision <= Decimal.MAX_LONG_DIGITS && !writeLegacyParquetFormat => + Types + .primitive(INT64, repetition) + .as(DECIMAL) + .precision(precision) + .scale(scale) + .named(field.name) + + // Uses FIXED_LEN_BYTE_ARRAY for all other precisions + case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat => + Types + .primitive(FIXED_LEN_BYTE_ARRAY, repetition) + .as(DECIMAL) + .precision(precision) + .scale(scale) + .length(ParquetSchemaConverter.minBytesForPrecision(precision)) + .named(field.name) + + // =================================== + // ArrayType and MapType (legacy mode) + // =================================== + + // Spark 1.4.x and prior versions convert `ArrayType` with nullable elements into a 3-level + // `LIST` structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro + // (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level element + // field name "array" is borrowed from parquet-avro. + case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat => + // <list-repetition> group <name> (LIST) { + // optional group bag { + // repeated <element-type> array; + // } + // } + ConversionPatterns.listType( + repetition, + field.name, + Types + .buildGroup(REPEATED) + // "array_element" is the name chosen by parquet-hive (1.7.0 and prior version) + .addField(convertField(StructField("array", elementType, nullable))) + .named("bag")) + + // Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level + // LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is + // covered by the backwards-compatibility rules implemented in `isElementType()`. + case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat => + // <list-repetition> group <name> (LIST) { + // repeated <element-type> element; + // } + ConversionPatterns.listType( + repetition, + field.name, + // "array" is the name chosen by parquet-avro (1.7.0 and prior version) + convertField(StructField("array", elementType, nullable), REPEATED)) + + // Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by + // MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`. + case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat => + // <map-repetition> group <name> (MAP) { + // repeated group map (MAP_KEY_VALUE) { + // required <key-type> key; + // <value-repetition> <value-type> value; + // } + // } + ConversionPatterns.mapType( + repetition, + field.name, + convertField(StructField("key", keyType, nullable = false)), + convertField(StructField("value", valueType, valueContainsNull))) + + // ===================================== + // ArrayType and MapType (standard mode) + // ===================================== + + case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat => + // <list-repetition> group <name> (LIST) { + // repeated group list { + // <element-repetition> <element-type> element; + // } + // } + Types + .buildGroup(repetition).as(LIST) + .addField( + Types.repeatedGroup() + .addField(convertField(StructField("element", elementType, containsNull))) + .named("list")) + .named(field.name) + + case MapType(keyType, valueType, valueContainsNull) => + // <map-repetition> group <name> (MAP) { + // repeated group key_value { + // required <key-type> key; + // <value-repetition> <value-type> value; + // } + // } + Types + .buildGroup(repetition).as(MAP) + .addField( + Types + .repeatedGroup() + .addField(convertField(StructField("key", keyType, nullable = false))) + .addField(convertField(StructField("value", valueType, valueContainsNull))) + .named("key_value")) + .named(field.name) + + // =========== + // Other types + // =========== + + case StructType(fields) => + fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) => + builder.addField(convertField(field)) + }.named(field.name) + + case udt: UserDefinedType[_] => + convertField(field.copy(dataType = udt.sqlType)) + + case _ => + throw new AnalysisException(s"Unsupported data type $field.dataType") + } + } +} + +private[parquet] object ParquetSchemaConverter { + val SPARK_PARQUET_SCHEMA_NAME = "spark_schema" + + // !! HACK ALERT !! + // + // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType, + // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`. + // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT. + // + // To workaround this problem, here we first construct a `MessageType` with a single dummy + // field, and then remove the field to obtain an empty `MessageType`. + // + // TODO Reverts this change after upgrading parquet-mr to 1.8.2+ + val EMPTY_MESSAGE = Types + .buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy") + .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + EMPTY_MESSAGE.getFields.clear() + + def checkFieldName(name: String): Unit = { + // ,;{}()\n\t= and space are special characters in Parquet schema + checkConversionRequirement( + !name.matches(".*[ ,;{}()\n\t=].*"), + s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=". + |Please use alias to rename it. + """.stripMargin.split("\n").mkString(" ").trim) + } + + def checkFieldNames(schema: StructType): StructType = { + schema.fieldNames.foreach(checkFieldName) + schema + } + + def checkConversionRequirement(f: => Boolean, message: String): Unit = { + if (!f) { + throw new AnalysisException(message) + } + } + + private def computeMinBytesForPrecision(precision : Int) : Int = { + var numBytes = 1 + while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) { + numBytes += 1 + } + numBytes + } + + // Returns the minimum number of bytes needed to store a decimal with a given `precision`. + val minBytesForPrecision = Array.tabulate[Int](39)(computeMinBytesForPrecision) + + // Max precision of a decimal value stored in `numBytes` bytes + def maxPrecisionForBytes(numBytes: Int): Int = { + Math.round( // convert double to long + Math.floor(Math.log10( // number of base-10 digits + Math.pow(2, 8 * numBytes - 1) - 1))) // max value stored in numBytes + .asInstanceOf[Int] + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 92f2db3..fc9ce6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -362,7 +362,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) - val expectedSchema = new CatalystSchemaConverter().convert(schema) + val expectedSchema = new ParquetSchemaConverter().convert(schema) val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema actualSchema.checkContains(expectedSchema) @@ -432,7 +432,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { """.stripMargin) withTempPath { location => - val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) + val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) val path = new Path(location.getCanonicalPath) val conf = spark.sessionState.newHadoopConf() writeMetadata(parquetSchema, path, conf, extraMetadata) http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index b4fd0ef..83d1001 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -574,7 +574,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext test("expand UDT in StructType") { val schema = new StructType().add("n", new NestedStructUDT, nullable = true) val expected = new StructType().add("n", new NestedStructUDT().sqlType, nullable = true) - assert(CatalystReadSupport.expandUDT(schema) === expected) + assert(ParquetReadSupport.expandUDT(schema) === expected) } test("expand UDT in ArrayType") { @@ -592,7 +592,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext containsNull = false), nullable = true) - assert(CatalystReadSupport.expandUDT(schema) === expected) + assert(ParquetReadSupport.expandUDT(schema) === expected) } test("expand UDT in MapType") { @@ -612,7 +612,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext valueContainsNull = false), nullable = true) - assert(CatalystReadSupport.expandUDT(schema) === expected) + assert(ParquetReadSupport.expandUDT(schema) === expected) } test("returning batch for wide table") { http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 865bfa2..8a980a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -54,7 +54,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { binaryAsString: Boolean, int96AsTimestamp: Boolean, writeLegacyParquetFormat: Boolean): Unit = { - val converter = new CatalystSchemaConverter( + val converter = new ParquetSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, writeLegacyParquetFormat = writeLegacyParquetFormat) @@ -78,7 +78,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { binaryAsString: Boolean, int96AsTimestamp: Boolean, writeLegacyParquetFormat: Boolean): Unit = { - val converter = new CatalystSchemaConverter( + val converter = new ParquetSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, writeLegacyParquetFormat = writeLegacyParquetFormat) @@ -1062,7 +1062,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { catalystSchema: StructType, expectedSchema: MessageType): Unit = { test(s"Clipping - $testName") { - val actual = CatalystReadSupport.clipParquetSchema( + val actual = ParquetReadSupport.clipParquetSchema( MessageTypeParser.parseMessageType(parquetSchema), catalystSchema) try { @@ -1424,7 +1424,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { catalystSchema = new StructType(), - expectedSchema = CatalystSchemaConverter.EMPTY_MESSAGE) + expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE) testSchemaClipping( "disjoint field sets", http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 1953d6f..9fb34e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -124,8 +124,8 @@ private[sql] trait ParquetTest extends SQLTestUtils { protected def writeMetadata( schema: StructType, path: Path, configuration: Configuration): Unit = { - val parquetSchema = new CatalystSchemaConverter().convert(schema) - val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schema.json).asJava + val parquetSchema = new ParquetSchemaConverter().convert(schema) + val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schema.json).asJava val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}" val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy) val parquetMetadata = new ParquetMetadata(fileMetadata, Seq.empty[BlockMetaData].asJava) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
