Repository: spark
Updated Branches:
  refs/heads/master 3e6d567a4 -> 865e7cc38


http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
new file mode 100644
index 0000000..e6ef634
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.Type.Repetition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+/**
+ * A Parquet [[ReadSupport]] implementation for reading Parquet records as 
Catalyst
+ * [[InternalRow]]s.
+ *
+ * The API interface of [[ReadSupport]] is a little bit over complicated 
because of historical
+ * reasons.  In older versions of parquet-mr (say 1.6.0rc3 and prior), 
[[ReadSupport]] need to be
+ * instantiated and initialized twice on both driver side and executor side.  
The [[init()]] method
+ * is for driver side initialization, while [[prepareForRead()]] is for 
executor side.  However,
+ * starting from parquet-mr 1.6.0, it's no longer the case, and 
[[ReadSupport]] is only instantiated
+ * and initialized on executor side.  So, theoretically, now it's totally fine 
to combine these two
+ * methods into a single initialization method.  The only reason (I could 
think of) to still have
+ * them here is for parquet-mr API backwards-compatibility.
+ *
+ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested 
schema from [[init()]]
+ * to [[prepareForRead()]], but use a private `var` for simplicity.
+ */
+private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] 
with Logging {
+  private var catalystRequestedSchema: StructType = _
+
+  /**
+   * Called on executor side before [[prepareForRead()]] and instantiating 
actual Parquet record
+   * readers.  Responsible for figuring out Parquet requested schema used for 
column pruning.
+   */
+  override def init(context: InitContext): ReadContext = {
+    catalystRequestedSchema = {
+      val conf = context.getConfiguration
+      val schemaString = 
conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
+      assert(schemaString != null, "Parquet requested schema not set.")
+      StructType.fromString(schemaString)
+    }
+
+    val parquetRequestedSchema =
+      ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
+
+    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
+  }
+
+  /**
+   * Called on executor side after [[init()]], before instantiating actual 
Parquet record readers.
+   * Responsible for instantiating [[RecordMaterializer]], which is used for 
converting Parquet
+   * records to Catalyst [[InternalRow]]s.
+   */
+  override def prepareForRead(
+      conf: Configuration,
+      keyValueMetaData: JMap[String, String],
+      fileSchema: MessageType,
+      readContext: ReadContext): RecordMaterializer[InternalRow] = {
+    log.debug(s"Preparing for read Parquet file with message type: 
$fileSchema")
+    val parquetRequestedSchema = readContext.getRequestedSchema
+
+    logInfo {
+      s"""Going to read the following fields from the Parquet file:
+         |
+         |Parquet form:
+         |$parquetRequestedSchema
+         |Catalyst form:
+         |$catalystRequestedSchema
+       """.stripMargin
+    }
+
+    new ParquetRecordMaterializer(
+      parquetRequestedSchema,
+      ParquetReadSupport.expandUDT(catalystRequestedSchema))
+  }
+}
+
+private[parquet] object ParquetReadSupport {
+  val SPARK_ROW_REQUESTED_SCHEMA = 
"org.apache.spark.sql.parquet.row.requested_schema"
+
+  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+  /**
+   * Tailors `parquetSchema` according to `catalystSchema` by removing column 
paths don't exist
+   * in `catalystSchema`, and adding those only exist in `catalystSchema`.
+   */
+  def clipParquetSchema(parquetSchema: MessageType, catalystSchema: 
StructType): MessageType = {
+    val clippedParquetFields = 
clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
+    if (clippedParquetFields.isEmpty) {
+      ParquetSchemaConverter.EMPTY_MESSAGE
+    } else {
+      Types
+        .buildMessage()
+        .addFields(clippedParquetFields: _*)
+        .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+    }
+  }
+
+  private def clipParquetType(parquetType: Type, catalystType: DataType): Type 
= {
+    catalystType match {
+      case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
+        // Only clips array types with nested type as element type.
+        clipParquetListType(parquetType.asGroupType(), t.elementType)
+
+      case t: MapType
+        if !isPrimitiveCatalystType(t.keyType) ||
+           !isPrimitiveCatalystType(t.valueType) =>
+        // Only clips map types with nested key type or value type
+        clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
+
+      case t: StructType =>
+        clipParquetGroup(parquetType.asGroupType(), t)
+
+      case _ =>
+        // UDTs and primitive types are not clipped.  For UDTs, a clipped 
version might not be able
+        // to be mapped to desired user-space types.  So UDTs shouldn't 
participate schema merging.
+        parquetType
+    }
+  }
+
+  /**
+   * Whether a Catalyst [[DataType]] is primitive.  Primitive [[DataType]] is 
not equivalent to
+   * [[AtomicType]].  For example, [[CalendarIntervalType]] is primitive, but 
it's not an
+   * [[AtomicType]].
+   */
+  private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
+    dataType match {
+      case _: ArrayType | _: MapType | _: StructType => false
+      case _ => true
+    }
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[ArrayType]].  The element type
+   * of the [[ArrayType]] should also be a nested type, namely an 
[[ArrayType]], a [[MapType]], or a
+   * [[StructType]].
+   */
+  private def clipParquetListType(parquetList: GroupType, elementType: 
DataType): Type = {
+    // Precondition of this method, should only be called for lists with 
nested element types.
+    assert(!isPrimitiveCatalystType(elementType))
+
+    // Unannotated repeated group should be interpreted as required list of 
required element, so
+    // list element type is just the group itself.  Clip it.
+    if (parquetList.getOriginalType == null && 
parquetList.isRepetition(Repetition.REPEATED)) {
+      clipParquetType(parquetList, elementType)
+    } else {
+      assert(
+        parquetList.getOriginalType == OriginalType.LIST,
+        "Invalid Parquet schema. " +
+          "Original type of annotated Parquet lists must be LIST: " +
+          parquetList.toString)
+
+      assert(
+        parquetList.getFieldCount == 1 && 
parquetList.getType(0).isRepetition(Repetition.REPEATED),
+        "Invalid Parquet schema. " +
+          "LIST-annotated group should only have exactly one repeated field: " 
+
+          parquetList)
+
+      // Precondition of this method, should only be called for lists with 
nested element types.
+      assert(!parquetList.getType(0).isPrimitive)
+
+      val repeatedGroup = parquetList.getType(0).asGroupType()
+
+      // If the repeated field is a group with multiple fields, or the 
repeated field is a group
+      // with one field and is named either "array" or uses the LIST-annotated 
group's name with
+      // "_tuple" appended then the repeated type is the element type and 
elements are required.
+      // Build a new LIST-annotated group with clipped `repeatedGroup` as 
element type and the
+      // only field.
+      if (
+        repeatedGroup.getFieldCount > 1 ||
+        repeatedGroup.getName == "array" ||
+        repeatedGroup.getName == parquetList.getName + "_tuple"
+      ) {
+        Types
+          .buildGroup(parquetList.getRepetition)
+          .as(OriginalType.LIST)
+          .addField(clipParquetType(repeatedGroup, elementType))
+          .named(parquetList.getName)
+      } else {
+        // Otherwise, the repeated field's type is the element type with the 
repeated field's
+        // repetition.
+        Types
+          .buildGroup(parquetList.getRepetition)
+          .as(OriginalType.LIST)
+          .addField(
+            Types
+              .repeatedGroup()
+              .addField(clipParquetType(repeatedGroup.getType(0), elementType))
+              .named(repeatedGroup.getName))
+          .named(parquetList.getName)
+      }
+    }
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[MapType]].  Either key type or
+   * value type of the [[MapType]] must be a nested type, namely an 
[[ArrayType]], a [[MapType]], or
+   * a [[StructType]].
+   */
+  private def clipParquetMapType(
+      parquetMap: GroupType, keyType: DataType, valueType: DataType): 
GroupType = {
+    // Precondition of this method, only handles maps with nested key types or 
value types.
+    assert(!isPrimitiveCatalystType(keyType) || 
!isPrimitiveCatalystType(valueType))
+
+    val repeatedGroup = parquetMap.getType(0).asGroupType()
+    val parquetKeyType = repeatedGroup.getType(0)
+    val parquetValueType = repeatedGroup.getType(1)
+
+    val clippedRepeatedGroup =
+      Types
+        .repeatedGroup()
+        .as(repeatedGroup.getOriginalType)
+        .addField(clipParquetType(parquetKeyType, keyType))
+        .addField(clipParquetType(parquetValueType, valueType))
+        .named(repeatedGroup.getName)
+
+    Types
+      .buildGroup(parquetMap.getRepetition)
+      .as(parquetMap.getOriginalType)
+      .addField(clippedRepeatedGroup)
+      .named(parquetMap.getName)
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[StructType]].
+   *
+   * @return A clipped [[GroupType]], which has at least one field.
+   * @note Parquet doesn't allow creating empty [[GroupType]] instances except 
for empty
+   *       [[MessageType]].  Because it's legal to construct an empty 
requested schema for column
+   *       pruning.
+   */
+  private def clipParquetGroup(parquetRecord: GroupType, structType: 
StructType): GroupType = {
+    val clippedParquetFields = clipParquetGroupFields(parquetRecord, 
structType)
+    Types
+      .buildGroup(parquetRecord.getRepetition)
+      .as(parquetRecord.getOriginalType)
+      .addFields(clippedParquetFields: _*)
+      .named(parquetRecord.getName)
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[StructType]].
+   *
+   * @return A list of clipped [[GroupType]] fields, which can be empty.
+   */
+  private def clipParquetGroupFields(
+      parquetRecord: GroupType, structType: StructType): Seq[Type] = {
+    val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName 
-> f).toMap
+    val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = 
false)
+    structType.map { f =>
+      parquetFieldMap
+        .get(f.name)
+        .map(clipParquetType(_, f.dataType))
+        .getOrElse(toParquet.convertField(f))
+    }
+  }
+
+  def expandUDT(schema: StructType): StructType = {
+    def expand(dataType: DataType): DataType = {
+      dataType match {
+        case t: ArrayType =>
+          t.copy(elementType = expand(t.elementType))
+
+        case t: MapType =>
+          t.copy(
+            keyType = expand(t.keyType),
+            valueType = expand(t.valueType))
+
+        case t: StructType =>
+          val expandedFields = t.fields.map(f => f.copy(dataType = 
expand(f.dataType)))
+          t.copy(fields = expandedFields)
+
+        case t: UserDefinedType[_] =>
+          t.sqlType
+
+        case t =>
+          t
+      }
+    }
+
+    expand(schema).asInstanceOf[StructType]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
new file mode 100644
index 0000000..0818d80
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
+import org.apache.parquet.schema.MessageType
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A [[RecordMaterializer]] for Catalyst rows.
+ *
+ * @param parquetSchema Parquet schema of the records to be read
+ * @param catalystSchema Catalyst schema of the rows to be constructed
+ */
+private[parquet] class ParquetRecordMaterializer(
+    parquetSchema: MessageType, catalystSchema: StructType)
+  extends RecordMaterializer[InternalRow] {
+
+  private val rootConverter = new ParquetRowConverter(parquetSchema, 
catalystSchema, NoopUpdater)
+
+  override def getCurrentRecord: InternalRow = rootConverter.currentRecord
+
+  override def getRootConverter: GroupConverter = rootConverter
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
new file mode 100644
index 0000000..9dad596
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.math.{BigDecimal, BigInteger}
+import java.nio.ByteOrder
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.parquet.column.Dictionary
+import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, 
PrimitiveConverter}
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
+import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, 
DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, 
GenericArrayData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A [[ParentContainerUpdater]] is used by a Parquet converter to set 
converted values to some
+ * corresponding parent container. For example, a converter for a `StructType` 
field may set
+ * converted values to a [[MutableRow]]; or a converter for array elements may 
append converted
+ * values to an [[ArrayBuffer]].
+ */
+private[parquet] trait ParentContainerUpdater {
+  /** Called before a record field is being converted */
+  def start(): Unit = ()
+
+  /** Called after a record field is being converted */
+  def end(): Unit = ()
+
+  def set(value: Any): Unit = ()
+  def setBoolean(value: Boolean): Unit = set(value)
+  def setByte(value: Byte): Unit = set(value)
+  def setShort(value: Short): Unit = set(value)
+  def setInt(value: Int): Unit = set(value)
+  def setLong(value: Long): Unit = set(value)
+  def setFloat(value: Float): Unit = set(value)
+  def setDouble(value: Double): Unit = set(value)
+}
+
+/** A no-op updater used for root converter (who doesn't have a parent). */
+private[parquet] object NoopUpdater extends ParentContainerUpdater
+
+private[parquet] trait HasParentContainerUpdater {
+  def updater: ParentContainerUpdater
+}
+
+/**
+ * A convenient converter class for Parquet group types with a 
[[HasParentContainerUpdater]].
+ */
+private[parquet] abstract class ParquetGroupConverter(val updater: 
ParentContainerUpdater)
+  extends GroupConverter with HasParentContainerUpdater
+
+/**
+ * Parquet converter for Parquet primitive types.  Note that not all Spark SQL 
atomic types
+ * are handled by this converter.  Parquet primitive types are only a subset 
of those of Spark
+ * SQL.  For example, BYTE, SHORT, and INT in Spark SQL are all covered by 
INT32 in Parquet.
+ */
+private[parquet] class ParquetPrimitiveConverter(val updater: 
ParentContainerUpdater)
+  extends PrimitiveConverter with HasParentContainerUpdater {
+
+  override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
+  override def addInt(value: Int): Unit = updater.setInt(value)
+  override def addLong(value: Long): Unit = updater.setLong(value)
+  override def addFloat(value: Float): Unit = updater.setFloat(value)
+  override def addDouble(value: Double): Unit = updater.setDouble(value)
+  override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
+}
+
+/**
+ * A [[ParquetRowConverter]] is used to convert Parquet records into Catalyst 
[[InternalRow]]s.
+ * Since Catalyst `StructType` is also a Parquet record, this converter can be 
used as root
+ * converter.  Take the following Parquet type as an example:
+ * {{{
+ *   message root {
+ *     required int32 f1;
+ *     optional group f2 {
+ *       required double f21;
+ *       optional binary f22 (utf8);
+ *     }
+ *   }
+ * }}}
+ * 5 converters will be created:
+ *
+ * - a root [[ParquetRowConverter]] for [[MessageType]] `root`, which contains:
+ *   - a [[ParquetPrimitiveConverter]] for required [[INT_32]] field `f1`, and
+ *   - a nested [[ParquetRowConverter]] for optional [[GroupType]] `f2`, which 
contains:
+ *     - a [[ParquetPrimitiveConverter]] for required [[DOUBLE]] field `f21`, 
and
+ *     - a [[ParquetStringConverter]] for optional [[UTF8]] string field `f22`
+ *
+ * When used as a root converter, [[NoopUpdater]] should be used since root 
converters don't have
+ * any "parent" container.
+ *
+ * @param parquetType Parquet schema of Parquet records
+ * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type. User-defined
+ *        types should have been expanded.
+ * @param updater An updater which propagates converted field values to the 
parent container
+ */
+private[parquet] class ParquetRowConverter(
+    parquetType: GroupType,
+    catalystType: StructType,
+    updater: ParentContainerUpdater)
+  extends ParquetGroupConverter(updater) with Logging {
+
+  assert(
+    parquetType.getFieldCount == catalystType.length,
+    s"""Field counts of the Parquet schema and the Catalyst schema don't match:
+       |
+       |Parquet schema:
+       |$parquetType
+       |Catalyst schema:
+       |${catalystType.prettyJson}
+     """.stripMargin)
+
+  assert(
+    !catalystType.existsRecursively(_.isInstanceOf[UserDefinedType[_]]),
+    s"""User-defined types in Catalyst schema should have already been 
expanded:
+       |${catalystType.prettyJson}
+     """.stripMargin)
+
+  logDebug(
+    s"""Building row converter for the following schema:
+       |
+       |Parquet form:
+       |$parquetType
+       |Catalyst form:
+       |${catalystType.prettyJson}
+     """.stripMargin)
+
+  /**
+   * Updater used together with field converters within a 
[[ParquetRowConverter]].  It propagates
+   * converted filed values to the `ordinal`-th cell in `currentRow`.
+   */
+  private final class RowUpdater(row: MutableRow, ordinal: Int) extends 
ParentContainerUpdater {
+    override def set(value: Any): Unit = row(ordinal) = value
+    override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, 
value)
+    override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
+    override def setShort(value: Short): Unit = row.setShort(ordinal, value)
+    override def setInt(value: Int): Unit = row.setInt(ordinal, value)
+    override def setLong(value: Long): Unit = row.setLong(ordinal, value)
+    override def setDouble(value: Double): Unit = row.setDouble(ordinal, value)
+    override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
+  }
+
+  private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
+
+  private val unsafeProjection = UnsafeProjection.create(catalystType)
+
+  /**
+   * The [[UnsafeRow]] converted from an entire Parquet record.
+   */
+  def currentRecord: UnsafeRow = unsafeProjection(currentRow)
+
+  // Converters for each field.
+  private val fieldConverters: Array[Converter with HasParentContainerUpdater] 
= {
+    parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
+      case ((parquetFieldType, catalystField), ordinal) =>
+        // Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
+        newConverter(parquetFieldType, catalystField.dataType, new 
RowUpdater(currentRow, ordinal))
+    }.toArray
+  }
+
+  override def getConverter(fieldIndex: Int): Converter = 
fieldConverters(fieldIndex)
+
+  override def end(): Unit = {
+    var i = 0
+    while (i < currentRow.numFields) {
+      fieldConverters(i).updater.end()
+      i += 1
+    }
+    updater.set(currentRow)
+  }
+
+  override def start(): Unit = {
+    var i = 0
+    while (i < currentRow.numFields) {
+      fieldConverters(i).updater.start()
+      currentRow.setNullAt(i)
+      i += 1
+    }
+  }
+
+  /**
+   * Creates a converter for the given Parquet type `parquetType` and Spark 
SQL data type
+   * `catalystType`. Converted values are handled by `updater`.
+   */
+  private def newConverter(
+      parquetType: Type,
+      catalystType: DataType,
+      updater: ParentContainerUpdater): Converter with 
HasParentContainerUpdater = {
+
+    catalystType match {
+      case BooleanType | IntegerType | LongType | FloatType | DoubleType | 
BinaryType =>
+        new ParquetPrimitiveConverter(updater)
+
+      case ByteType =>
+        new ParquetPrimitiveConverter(updater) {
+          override def addInt(value: Int): Unit =
+            updater.setByte(value.asInstanceOf[ByteType#InternalType])
+        }
+
+      case ShortType =>
+        new ParquetPrimitiveConverter(updater) {
+          override def addInt(value: Int): Unit =
+            updater.setShort(value.asInstanceOf[ShortType#InternalType])
+        }
+
+      // For INT32 backed decimals
+      case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
+        new ParquetIntDictionaryAwareDecimalConverter(t.precision, t.scale, 
updater)
+
+      // For INT64 backed decimals
+      case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
+        new ParquetLongDictionaryAwareDecimalConverter(t.precision, t.scale, 
updater)
+
+      // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
+      case t: DecimalType
+        if parquetType.asPrimitiveType().getPrimitiveTypeName == 
FIXED_LEN_BYTE_ARRAY ||
+           parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
+        new ParquetBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, 
updater)
+
+      case t: DecimalType =>
+        throw new RuntimeException(
+          s"Unable to create Parquet converter for decimal type ${t.json} 
whose Parquet type is " +
+            s"$parquetType.  Parquet DECIMAL type can only be backed by INT32, 
INT64, " +
+            "FIXED_LEN_BYTE_ARRAY, or BINARY.")
+
+      case StringType =>
+        new ParquetStringConverter(updater)
+
+      case TimestampType =>
+        // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
+        new ParquetPrimitiveConverter(updater) {
+          // Converts nanosecond timestamps stored as INT96
+          override def addBinary(value: Binary): Unit = {
+            assert(
+              value.length() == 12,
+              "Timestamps (with nanoseconds) are expected to be stored in 
12-byte long binaries, " +
+              s"but got a ${value.length()}-byte binary.")
+
+            val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
+            val timeOfDayNanos = buf.getLong
+            val julianDay = buf.getInt
+            updater.setLong(DateTimeUtils.fromJulianDay(julianDay, 
timeOfDayNanos))
+          }
+        }
+
+      case DateType =>
+        new ParquetPrimitiveConverter(updater) {
+          override def addInt(value: Int): Unit = {
+            // DateType is not specialized in `SpecificMutableRow`, have to 
box it here.
+            updater.set(value.asInstanceOf[DateType#InternalType])
+          }
+        }
+
+      // A repeated field that is neither contained by a `LIST`- or 
`MAP`-annotated group nor
+      // annotated by `LIST` or `MAP` should be interpreted as a required list 
of required
+      // elements where the element type is the type of the field.
+      case t: ArrayType if parquetType.getOriginalType != LIST =>
+        if (parquetType.isPrimitive) {
+          new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
+        } else {
+          new RepeatedGroupConverter(parquetType, t.elementType, updater)
+        }
+
+      case t: ArrayType =>
+        new ParquetArrayConverter(parquetType.asGroupType(), t, updater)
+
+      case t: MapType =>
+        new ParquetMapConverter(parquetType.asGroupType(), t, updater)
+
+      case t: StructType =>
+        new ParquetRowConverter(parquetType.asGroupType(), t, new 
ParentContainerUpdater {
+          override def set(value: Any): Unit = 
updater.set(value.asInstanceOf[InternalRow].copy())
+        })
+
+      case t =>
+        throw new RuntimeException(
+          s"Unable to create Parquet converter for data type ${t.json} " +
+            s"whose Parquet type is $parquetType")
+    }
+  }
+
+  /**
+   * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
+   */
+  private final class ParquetStringConverter(updater: ParentContainerUpdater)
+    extends ParquetPrimitiveConverter(updater) {
+
+    private var expandedDictionary: Array[UTF8String] = null
+
+    override def hasDictionarySupport: Boolean = true
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
+        UTF8String.fromBytes(dictionary.decodeToBinary(i).getBytes)
+      }
+    }
+
+    override def addValueFromDictionary(dictionaryId: Int): Unit = {
+      updater.set(expandedDictionary(dictionaryId))
+    }
+
+    override def addBinary(value: Binary): Unit = {
+      // The underlying `ByteBuffer` implementation is guaranteed to be 
`HeapByteBuffer`, so here we
+      // are using `Binary.toByteBuffer.array()` to steal the underlying byte 
array without copying
+      // it.
+      val buffer = value.toByteBuffer
+      val offset = buffer.arrayOffset() + buffer.position()
+      val numBytes = buffer.remaining()
+      updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes))
+    }
+  }
+
+  /**
+   * Parquet converter for fixed-precision decimals.
+   */
+  private abstract class ParquetDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
+    extends ParquetPrimitiveConverter(updater) {
+
+    protected var expandedDictionary: Array[Decimal] = _
+
+    override def hasDictionarySupport: Boolean = true
+
+    override def addValueFromDictionary(dictionaryId: Int): Unit = {
+      updater.set(expandedDictionary(dictionaryId))
+    }
+
+    // Converts decimals stored as INT32
+    override def addInt(value: Int): Unit = {
+      addLong(value: Long)
+    }
+
+    // Converts decimals stored as INT64
+    override def addLong(value: Long): Unit = {
+      updater.set(decimalFromLong(value))
+    }
+
+    // Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY
+    override def addBinary(value: Binary): Unit = {
+      updater.set(decimalFromBinary(value))
+    }
+
+    protected def decimalFromLong(value: Long): Decimal = {
+      Decimal(value, precision, scale)
+    }
+
+    protected def decimalFromBinary(value: Binary): Decimal = {
+      if (precision <= Decimal.MAX_LONG_DIGITS) {
+        // Constructs a `Decimal` with an unscaled `Long` value if possible.
+        val unscaled = ParquetRowConverter.binaryToUnscaledLong(value)
+        Decimal(unscaled, precision, scale)
+      } else {
+        // Otherwise, resorts to an unscaled `BigInteger` instead.
+        Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), 
precision, scale)
+      }
+    }
+  }
+
+  private class ParquetIntDictionaryAwareDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
+    extends ParquetDecimalConverter(precision, scale, updater) {
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+        decimalFromLong(dictionary.decodeToInt(id).toLong)
+      }
+    }
+  }
+
+  private class ParquetLongDictionaryAwareDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
+    extends ParquetDecimalConverter(precision, scale, updater) {
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+        decimalFromLong(dictionary.decodeToLong(id))
+      }
+    }
+  }
+
+  private class ParquetBinaryDictionaryAwareDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
+    extends ParquetDecimalConverter(precision, scale, updater) {
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+        decimalFromBinary(dictionary.decodeToBinary(id))
+      }
+    }
+  }
+
+  /**
+   * Parquet converter for arrays.  Spark SQL arrays are represented as 
Parquet lists.  Standard
+   * Parquet lists are represented as a 3-level group annotated by `LIST`:
+   * {{{
+   *   <list-repetition> group <name> (LIST) {            <-- parquetSchema 
points here
+   *     repeated group list {
+   *       <element-repetition> <element-type> element;
+   *     }
+   *   }
+   * }}}
+   * The `parquetSchema` constructor argument points to the outermost group.
+   *
+   * However, before this representation is standardized, some Parquet 
libraries/tools also use some
+   * non-standard formats to represent list-like structures.  
Backwards-compatibility rules for
+   * handling these cases are described in Parquet format spec.
+   *
+   * @see 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+   */
+  private final class ParquetArrayConverter(
+      parquetSchema: GroupType,
+      catalystSchema: ArrayType,
+      updater: ParentContainerUpdater)
+    extends ParquetGroupConverter(updater) {
+
+    private var currentArray: ArrayBuffer[Any] = _
+
+    private val elementConverter: Converter = {
+      val repeatedType = parquetSchema.getType(0)
+      val elementType = catalystSchema.elementType
+      val parentName = parquetSchema.getName
+
+      if (isElementType(repeatedType, elementType, parentName)) {
+        newConverter(repeatedType, elementType, new ParentContainerUpdater {
+          override def set(value: Any): Unit = currentArray += value
+        })
+      } else {
+        new ElementConverter(repeatedType.asGroupType().getType(0), 
elementType)
+      }
+    }
+
+    override def getConverter(fieldIndex: Int): Converter = elementConverter
+
+    override def end(): Unit = updater.set(new 
GenericArrayData(currentArray.toArray))
+
+    // NOTE: We can't reuse the mutable `ArrayBuffer` here and must 
instantiate a new buffer for the
+    // next value.  `Row.copy()` only copies row cells, it doesn't do deep 
copy to objects stored
+    // in row cells.
+    override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
+
+    // scalastyle:off
+    /**
+     * Returns whether the given type is the element type of a list or is a 
syntactic group with
+     * one field that is the element type.  This is determined by checking 
whether the type can be
+     * a syntactic group and by checking whether a potential syntactic group 
matches the expected
+     * schema.
+     * {{{
+     *   <list-repetition> group <name> (LIST) {
+     *     repeated group list {                          <-- repeatedType 
points here
+     *       <element-repetition> <element-type> element;
+     *     }
+     *   }
+     * }}}
+     * In short, here we handle Parquet list backwards-compatibility rules on 
the read path.  This
+     * method is based on `AvroIndexedRecordConverter.isElementType`.
+     *
+     * @see 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+     */
+    // scalastyle:on
+    private def isElementType(
+        parquetRepeatedType: Type, catalystElementType: DataType, parentName: 
String): Boolean = {
+      (parquetRepeatedType, catalystElementType) match {
+        case (t: PrimitiveType, _) => true
+        case (t: GroupType, _) if t.getFieldCount > 1 => true
+        case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" 
=> true
+        case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == 
parentName + "_tuple" => true
+        case (t: GroupType, StructType(Array(f))) if f.name == 
t.getFieldName(0) => true
+        case _ => false
+      }
+    }
+
+    /** Array element converter */
+    private final class ElementConverter(parquetType: Type, catalystType: 
DataType)
+      extends GroupConverter {
+
+      private var currentElement: Any = _
+
+      private val converter = newConverter(parquetType, catalystType, new 
ParentContainerUpdater {
+        override def set(value: Any): Unit = currentElement = value
+      })
+
+      override def getConverter(fieldIndex: Int): Converter = converter
+
+      override def end(): Unit = currentArray += currentElement
+
+      override def start(): Unit = currentElement = null
+    }
+  }
+
+  /** Parquet converter for maps */
+  private final class ParquetMapConverter(
+      parquetType: GroupType,
+      catalystType: MapType,
+      updater: ParentContainerUpdater)
+    extends ParquetGroupConverter(updater) {
+
+    private var currentKeys: ArrayBuffer[Any] = _
+    private var currentValues: ArrayBuffer[Any] = _
+
+    private val keyValueConverter = {
+      val repeatedType = parquetType.getType(0).asGroupType()
+      new KeyValueConverter(
+        repeatedType.getType(0),
+        repeatedType.getType(1),
+        catalystType.keyType,
+        catalystType.valueType)
+    }
+
+    override def getConverter(fieldIndex: Int): Converter = keyValueConverter
+
+    override def end(): Unit =
+      updater.set(ArrayBasedMapData(currentKeys.toArray, 
currentValues.toArray))
+
+    // NOTE: We can't reuse the mutable Map here and must instantiate a new 
`Map` for the next
+    // value.  `Row.copy()` only copies row cells, it doesn't do deep copy to 
objects stored in row
+    // cells.
+    override def start(): Unit = {
+      currentKeys = ArrayBuffer.empty[Any]
+      currentValues = ArrayBuffer.empty[Any]
+    }
+
+    /** Parquet converter for key-value pairs within the map. */
+    private final class KeyValueConverter(
+        parquetKeyType: Type,
+        parquetValueType: Type,
+        catalystKeyType: DataType,
+        catalystValueType: DataType)
+      extends GroupConverter {
+
+      private var currentKey: Any = _
+
+      private var currentValue: Any = _
+
+      private val converters = Array(
+        // Converter for keys
+        newConverter(parquetKeyType, catalystKeyType, new 
ParentContainerUpdater {
+          override def set(value: Any): Unit = currentKey = value
+        }),
+
+        // Converter for values
+        newConverter(parquetValueType, catalystValueType, new 
ParentContainerUpdater {
+          override def set(value: Any): Unit = currentValue = value
+        }))
+
+      override def getConverter(fieldIndex: Int): Converter = 
converters(fieldIndex)
+
+      override def end(): Unit = {
+        currentKeys += currentKey
+        currentValues += currentValue
+      }
+
+      override def start(): Unit = {
+        currentKey = null
+        currentValue = null
+      }
+    }
+  }
+
+  private trait RepeatedConverter {
+    private var currentArray: ArrayBuffer[Any] = _
+
+    protected def newArrayUpdater(updater: ParentContainerUpdater) = new 
ParentContainerUpdater {
+      override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
+      override def end(): Unit = updater.set(new 
GenericArrayData(currentArray.toArray))
+      override def set(value: Any): Unit = currentArray += value
+    }
+  }
+
+  /**
+   * A primitive converter for converting unannotated repeated primitive 
values to required arrays
+   * of required primitives values.
+   */
+  private final class RepeatedPrimitiveConverter(
+      parquetType: Type,
+      catalystType: DataType,
+      parentUpdater: ParentContainerUpdater)
+    extends PrimitiveConverter with RepeatedConverter with 
HasParentContainerUpdater {
+
+    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
+
+    private val elementConverter: PrimitiveConverter =
+      newConverter(parquetType, catalystType, updater).asPrimitiveConverter()
+
+    override def addBoolean(value: Boolean): Unit = 
elementConverter.addBoolean(value)
+    override def addInt(value: Int): Unit = elementConverter.addInt(value)
+    override def addLong(value: Long): Unit = elementConverter.addLong(value)
+    override def addFloat(value: Float): Unit = 
elementConverter.addFloat(value)
+    override def addDouble(value: Double): Unit = 
elementConverter.addDouble(value)
+    override def addBinary(value: Binary): Unit = 
elementConverter.addBinary(value)
+
+    override def setDictionary(dict: Dictionary): Unit = 
elementConverter.setDictionary(dict)
+    override def hasDictionarySupport: Boolean = 
elementConverter.hasDictionarySupport
+    override def addValueFromDictionary(id: Int): Unit = 
elementConverter.addValueFromDictionary(id)
+  }
+
+  /**
+   * A group converter for converting unannotated repeated group values to 
required arrays of
+   * required struct values.
+   */
+  private final class RepeatedGroupConverter(
+      parquetType: Type,
+      catalystType: DataType,
+      parentUpdater: ParentContainerUpdater)
+    extends GroupConverter with HasParentContainerUpdater with 
RepeatedConverter {
+
+    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
+
+    private val elementConverter: GroupConverter =
+      newConverter(parquetType, catalystType, updater).asGroupConverter()
+
+    override def getConverter(field: Int): Converter = 
elementConverter.getConverter(field)
+    override def end(): Unit = elementConverter.end()
+    override def start(): Unit = elementConverter.start()
+  }
+}
+
+private[parquet] object ParquetRowConverter {
+  def binaryToUnscaledLong(binary: Binary): Long = {
+    // The underlying `ByteBuffer` implementation is guaranteed to be 
`HeapByteBuffer`, so here
+    // we are using `Binary.toByteBuffer.array()` to steal the underlying byte 
array without
+    // copying it.
+    val buffer = binary.toByteBuffer
+    val bytes = buffer.array()
+    val start = buffer.arrayOffset() + buffer.position()
+    val end = buffer.arrayOffset() + buffer.limit()
+
+    var unscaled = 0L
+    var i = start
+
+    while (i < end) {
+      unscaled = (unscaled << 8) | (bytes(i) & 0xff)
+      i += 1
+    }
+
+    val bits = 8 * (end - start)
+    unscaled = (unscaled << (64 - bits)) >> (64 - bits)
+    unscaled
+  }
+
+  def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = {
+    assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected 
to be stored in" +
+      s" 12-byte long binaries. Found a ${binary.length()}-byte binary 
instead.")
+    val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
+    val timeOfDayNanos = buffer.getLong
+    val julianDay = buffer.getInt
+    DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
new file mode 100644
index 0000000..bcf535d
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
+import org.apache.parquet.schema.Type.Repetition._
+
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.maxPrecisionForBytes
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This converter class is used to convert Parquet [[MessageType]] to Spark 
SQL [[StructType]] and
+ * vice versa.
+ *
+ * Parquet format backwards-compatibility rules are respected when converting 
Parquet
+ * [[MessageType]] schemas.
+ *
+ * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+ * @constructor
+ * @param assumeBinaryIsString Whether unannotated BINARY fields should be 
assumed to be Spark SQL
+ *        [[StringType]] fields when converting Parquet a [[MessageType]] to 
Spark SQL
+ *        [[StructType]].  This argument only affects Parquet read path.
+ * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be 
assumed to be Spark SQL
+ *        [[TimestampType]] fields when converting Parquet a [[MessageType]] 
to Spark SQL
+ *        [[StructType]].  Note that Spark SQL [[TimestampType]] is similar to 
Hive timestamp, which
+ *        has optional nanosecond precision, but different from `TIME_MILLS` 
and `TIMESTAMP_MILLIS`
+ *        described in Parquet format spec.  This argument only affects 
Parquet read path.
+ * @param writeLegacyParquetFormat Whether to use legacy Parquet format 
compatible with Spark 1.4
+ *        and prior versions when converting a Catalyst [[StructType]] to a 
Parquet [[MessageType]].
+ *        When set to false, use standard format defined in parquet-format 
spec.  This argument only
+ *        affects Parquet write path.
+ */
+private[parquet] class ParquetSchemaConverter(
+    assumeBinaryIsString: Boolean = 
SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
+    assumeInt96IsTimestamp: Boolean = 
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
+    writeLegacyParquetFormat: Boolean = 
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) {
+
+  def this(conf: SQLConf) = this(
+    assumeBinaryIsString = conf.isParquetBinaryAsString,
+    assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
+    writeLegacyParquetFormat = conf.writeLegacyParquetFormat)
+
+  def this(conf: Configuration) = this(
+    assumeBinaryIsString = 
conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
+    assumeInt96IsTimestamp = 
conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
+    writeLegacyParquetFormat = 
conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
+      SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean)
+
+  /**
+   * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL 
[[StructType]].
+   */
+  def convert(parquetSchema: MessageType): StructType = 
convert(parquetSchema.asGroupType())
+
+  private def convert(parquetSchema: GroupType): StructType = {
+    val fields = parquetSchema.getFields.asScala.map { field =>
+      field.getRepetition match {
+        case OPTIONAL =>
+          StructField(field.getName, convertField(field), nullable = true)
+
+        case REQUIRED =>
+          StructField(field.getName, convertField(field), nullable = false)
+
+        case REPEATED =>
+          // A repeated field that is neither contained by a `LIST`- or 
`MAP`-annotated group nor
+          // annotated by `LIST` or `MAP` should be interpreted as a required 
list of required
+          // elements where the element type is the type of the field.
+          val arrayType = ArrayType(convertField(field), containsNull = false)
+          StructField(field.getName, arrayType, nullable = false)
+      }
+    }
+
+    StructType(fields)
+  }
+
+  /**
+   * Converts a Parquet [[Type]] to a Spark SQL [[DataType]].
+   */
+  def convertField(parquetType: Type): DataType = parquetType match {
+    case t: PrimitiveType => convertPrimitiveField(t)
+    case t: GroupType => convertGroupField(t.asGroupType())
+  }
+
+  private def convertPrimitiveField(field: PrimitiveType): DataType = {
+    val typeName = field.getPrimitiveTypeName
+    val originalType = field.getOriginalType
+
+    def typeString =
+      if (originalType == null) s"$typeName" else s"$typeName ($originalType)"
+
+    def typeNotSupported() =
+      throw new AnalysisException(s"Parquet type not supported: $typeString")
+
+    def typeNotImplemented() =
+      throw new AnalysisException(s"Parquet type not yet supported: 
$typeString")
+
+    def illegalType() =
+      throw new AnalysisException(s"Illegal Parquet type: $typeString")
+
+    // When maxPrecision = -1, we skip precision range check, and always 
respect the precision
+    // specified in field.getDecimalMetadata.  This is useful when 
interpreting decimal types stored
+    // as binaries with variable lengths.
+    def makeDecimalType(maxPrecision: Int = -1): DecimalType = {
+      val precision = field.getDecimalMetadata.getPrecision
+      val scale = field.getDecimalMetadata.getScale
+
+      ParquetSchemaConverter.checkConversionRequirement(
+        maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
+        s"Invalid decimal precision: $typeName cannot store $precision digits 
(max $maxPrecision)")
+
+      DecimalType(precision, scale)
+    }
+
+    typeName match {
+      case BOOLEAN => BooleanType
+
+      case FLOAT => FloatType
+
+      case DOUBLE => DoubleType
+
+      case INT32 =>
+        originalType match {
+          case INT_8 => ByteType
+          case INT_16 => ShortType
+          case INT_32 | null => IntegerType
+          case DATE => DateType
+          case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS)
+          case UINT_8 => typeNotSupported()
+          case UINT_16 => typeNotSupported()
+          case UINT_32 => typeNotSupported()
+          case TIME_MILLIS => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case INT64 =>
+        originalType match {
+          case INT_64 | null => LongType
+          case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
+          case UINT_64 => typeNotSupported()
+          case TIMESTAMP_MILLIS => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case INT96 =>
+        ParquetSchemaConverter.checkConversionRequirement(
+          assumeInt96IsTimestamp,
+          "INT96 is not supported unless it's interpreted as timestamp. " +
+            s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to 
true.")
+        TimestampType
+
+      case BINARY =>
+        originalType match {
+          case UTF8 | ENUM | JSON => StringType
+          case null if assumeBinaryIsString => StringType
+          case null => BinaryType
+          case BSON => BinaryType
+          case DECIMAL => makeDecimalType()
+          case _ => illegalType()
+        }
+
+      case FIXED_LEN_BYTE_ARRAY =>
+        originalType match {
+          case DECIMAL => 
makeDecimalType(maxPrecisionForBytes(field.getTypeLength))
+          case INTERVAL => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case _ => illegalType()
+    }
+  }
+
+  private def convertGroupField(field: GroupType): DataType = {
+    Option(field.getOriginalType).fold(convert(field): DataType) {
+      // A Parquet list is represented as a 3-level structure:
+      //
+      //   <list-repetition> group <name> (LIST) {
+      //     repeated group list {
+      //       <element-repetition> <element-type> element;
+      //     }
+      //   }
+      //
+      // However, according to the most recent Parquet format spec (not 
released yet up until
+      // writing), some 2-level structures are also recognized for 
backwards-compatibility.  Thus,
+      // we need to check whether the 2nd level or the 3rd level refers to 
list element type.
+      //
+      // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+      case LIST =>
+        ParquetSchemaConverter.checkConversionRequirement(
+          field.getFieldCount == 1, s"Invalid list type $field")
+
+        val repeatedType = field.getType(0)
+        ParquetSchemaConverter.checkConversionRequirement(
+          repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
+
+        if (isElementType(repeatedType, field.getName)) {
+          ArrayType(convertField(repeatedType), containsNull = false)
+        } else {
+          val elementType = repeatedType.asGroupType().getType(0)
+          val optional = elementType.isRepetition(OPTIONAL)
+          ArrayType(convertField(elementType), containsNull = optional)
+        }
+
+      // scalastyle:off
+      // `MAP_KEY_VALUE` is for backwards-compatibility
+      // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
+      // scalastyle:on
+      case MAP | MAP_KEY_VALUE =>
+        ParquetSchemaConverter.checkConversionRequirement(
+          field.getFieldCount == 1 && !field.getType(0).isPrimitive,
+          s"Invalid map type: $field")
+
+        val keyValueType = field.getType(0).asGroupType()
+        ParquetSchemaConverter.checkConversionRequirement(
+          keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 
2,
+          s"Invalid map type: $field")
+
+        val keyType = keyValueType.getType(0)
+        ParquetSchemaConverter.checkConversionRequirement(
+          keyType.isPrimitive,
+          s"Map key type is expected to be a primitive type, but found: 
$keyType")
+
+        val valueType = keyValueType.getType(1)
+        val valueOptional = valueType.isRepetition(OPTIONAL)
+        MapType(
+          convertField(keyType),
+          convertField(valueType),
+          valueContainsNull = valueOptional)
+
+      case _ =>
+        throw new AnalysisException(s"Unrecognized Parquet type: $field")
+    }
+  }
+
+  // scalastyle:off
+  // Here we implement Parquet LIST backwards-compatibility rules.
+  // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+  // scalastyle:on
+  private def isElementType(repeatedType: Type, parentName: String): Boolean = 
{
+    {
+      // For legacy 2-level list types with primitive element type, e.g.:
+      //
+      //    // List<Integer> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated int32 element;
+      //    }
+      //
+      repeatedType.isPrimitive
+    } || {
+      // For legacy 2-level list types whose element type is a group type with 
2 or more fields,
+      // e.g.:
+      //
+      //    // List<Tuple<String, Integer>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group element {
+      //        required binary str (UTF8);
+      //        required int32 num;
+      //      };
+      //    }
+      //
+      repeatedType.asGroupType().getFieldCount > 1
+    } || {
+      // For legacy 2-level list types generated by parquet-avro (Parquet 
version < 1.6.0), e.g.:
+      //
+      //    // List<OneTuple<String>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group array {
+      //        required binary str (UTF8);
+      //      };
+      //    }
+      //
+      repeatedType.getName == "array"
+    } || {
+      // For Parquet data generated by parquet-thrift, e.g.:
+      //
+      //    // List<OneTuple<String>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group my_list_tuple {
+      //        required binary str (UTF8);
+      //      };
+      //    }
+      //
+      repeatedType.getName == s"${parentName}_tuple"
+    }
+  }
+
+  /**
+   * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
+   */
+  def convert(catalystSchema: StructType): MessageType = {
+    Types
+      .buildMessage()
+      .addFields(catalystSchema.map(convertField): _*)
+      .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+  }
+
+  /**
+   * Converts a Spark SQL [[StructField]] to a Parquet [[Type]].
+   */
+  def convertField(field: StructField): Type = {
+    convertField(field, if (field.nullable) OPTIONAL else REQUIRED)
+  }
+
+  private def convertField(field: StructField, repetition: Type.Repetition): 
Type = {
+    ParquetSchemaConverter.checkFieldName(field.name)
+
+    field.dataType match {
+      // ===================
+      // Simple atomic types
+      // ===================
+
+      case BooleanType =>
+        Types.primitive(BOOLEAN, repetition).named(field.name)
+
+      case ByteType =>
+        Types.primitive(INT32, repetition).as(INT_8).named(field.name)
+
+      case ShortType =>
+        Types.primitive(INT32, repetition).as(INT_16).named(field.name)
+
+      case IntegerType =>
+        Types.primitive(INT32, repetition).named(field.name)
+
+      case LongType =>
+        Types.primitive(INT64, repetition).named(field.name)
+
+      case FloatType =>
+        Types.primitive(FLOAT, repetition).named(field.name)
+
+      case DoubleType =>
+        Types.primitive(DOUBLE, repetition).named(field.name)
+
+      case StringType =>
+        Types.primitive(BINARY, repetition).as(UTF8).named(field.name)
+
+      case DateType =>
+        Types.primitive(INT32, repetition).as(DATE).named(field.name)
+
+      // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet 
format spec.
+      //
+      // As stated in PARQUET-323, Parquet `INT96` was originally introduced 
to represent nanosecond
+      // timestamp in Impala for some historical reasons.  It's not 
recommended to be used for any
+      // other types and will probably be deprecated in some future version of 
parquet-format spec.
+      // That's the reason why parquet-format spec only defines 
`TIMESTAMP_MILLIS` and
+      // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
+      //
+      // Originally, Spark SQL uses the same nanosecond timestamp type as 
Impala and Hive.  Starting
+      // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision 
so that we can store
+      // a timestamp into a `Long`.  This design decision is subject to change 
though, for example,
+      // we may resort to microsecond precision in the future.
+      //
+      // For Parquet, we plan to write all `TimestampType` value as 
`TIMESTAMP_MICROS`, but it's
+      // currently not implemented yet because parquet-mr 1.7.0 (the version 
we're currently using)
+      // hasn't implemented `TIMESTAMP_MICROS` yet.
+      //
+      // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
+      case TimestampType =>
+        Types.primitive(INT96, repetition).named(field.name)
+
+      case BinaryType =>
+        Types.primitive(BINARY, repetition).named(field.name)
+
+      // ======================
+      // Decimals (legacy mode)
+      // ======================
+
+      // Spark 1.4.x and prior versions only support decimals with a maximum 
precision of 18 and
+      // always store decimals in fixed-length byte arrays.  To keep 
compatibility with these older
+      // versions, here we convert decimals with all precisions to 
`FIXED_LEN_BYTE_ARRAY` annotated
+      // by `DECIMAL`.
+      case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
+        Types
+          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .length(ParquetSchemaConverter.minBytesForPrecision(precision))
+          .named(field.name)
+
+      // ========================
+      // Decimals (standard mode)
+      // ========================
+
+      // Uses INT32 for 1 <= precision <= 9
+      case DecimalType.Fixed(precision, scale)
+          if precision <= Decimal.MAX_INT_DIGITS && !writeLegacyParquetFormat 
=>
+        Types
+          .primitive(INT32, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .named(field.name)
+
+      // Uses INT64 for 1 <= precision <= 18
+      case DecimalType.Fixed(precision, scale)
+          if precision <= Decimal.MAX_LONG_DIGITS && !writeLegacyParquetFormat 
=>
+        Types
+          .primitive(INT64, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .named(field.name)
+
+      // Uses FIXED_LEN_BYTE_ARRAY for all other precisions
+      case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
+        Types
+          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .length(ParquetSchemaConverter.minBytesForPrecision(precision))
+          .named(field.name)
+
+      // ===================================
+      // ArrayType and MapType (legacy mode)
+      // ===================================
+
+      // Spark 1.4.x and prior versions convert `ArrayType` with nullable 
elements into a 3-level
+      // `LIST` structure.  This behavior is somewhat a hybrid of parquet-hive 
and parquet-avro
+      // (1.6.0rc3): the 3-level structure is similar to parquet-hive while 
the 3rd level element
+      // field name "array" is borrowed from parquet-avro.
+      case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat 
=>
+        // <list-repetition> group <name> (LIST) {
+        //   optional group bag {
+        //     repeated <element-type> array;
+        //   }
+        // }
+        ConversionPatterns.listType(
+          repetition,
+          field.name,
+          Types
+            .buildGroup(REPEATED)
+            // "array_element" is the name chosen by parquet-hive (1.7.0 and 
prior version)
+            .addField(convertField(StructField("array", elementType, 
nullable)))
+            .named("bag"))
+
+      // Spark 1.4.x and prior versions convert ArrayType with non-nullable 
elements into a 2-level
+      // LIST structure.  This behavior mimics parquet-avro (1.6.0rc3).  Note 
that this case is
+      // covered by the backwards-compatibility rules implemented in 
`isElementType()`.
+      case ArrayType(elementType, nullable @ false) if 
writeLegacyParquetFormat =>
+        // <list-repetition> group <name> (LIST) {
+        //   repeated <element-type> element;
+        // }
+        ConversionPatterns.listType(
+          repetition,
+          field.name,
+          // "array" is the name chosen by parquet-avro (1.7.0 and prior 
version)
+          convertField(StructField("array", elementType, nullable), REPEATED))
+
+      // Spark 1.4.x and prior versions convert MapType into a 3-level group 
annotated by
+      // MAP_KEY_VALUE.  This is covered by `convertGroupField(field: 
GroupType): DataType`.
+      case MapType(keyType, valueType, valueContainsNull) if 
writeLegacyParquetFormat =>
+        // <map-repetition> group <name> (MAP) {
+        //   repeated group map (MAP_KEY_VALUE) {
+        //     required <key-type> key;
+        //     <value-repetition> <value-type> value;
+        //   }
+        // }
+        ConversionPatterns.mapType(
+          repetition,
+          field.name,
+          convertField(StructField("key", keyType, nullable = false)),
+          convertField(StructField("value", valueType, valueContainsNull)))
+
+      // =====================================
+      // ArrayType and MapType (standard mode)
+      // =====================================
+
+      case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat =>
+        // <list-repetition> group <name> (LIST) {
+        //   repeated group list {
+        //     <element-repetition> <element-type> element;
+        //   }
+        // }
+        Types
+          .buildGroup(repetition).as(LIST)
+          .addField(
+            Types.repeatedGroup()
+              .addField(convertField(StructField("element", elementType, 
containsNull)))
+              .named("list"))
+          .named(field.name)
+
+      case MapType(keyType, valueType, valueContainsNull) =>
+        // <map-repetition> group <name> (MAP) {
+        //   repeated group key_value {
+        //     required <key-type> key;
+        //     <value-repetition> <value-type> value;
+        //   }
+        // }
+        Types
+          .buildGroup(repetition).as(MAP)
+          .addField(
+            Types
+              .repeatedGroup()
+              .addField(convertField(StructField("key", keyType, nullable = 
false)))
+              .addField(convertField(StructField("value", valueType, 
valueContainsNull)))
+              .named("key_value"))
+          .named(field.name)
+
+      // ===========
+      // Other types
+      // ===========
+
+      case StructType(fields) =>
+        fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) =>
+          builder.addField(convertField(field))
+        }.named(field.name)
+
+      case udt: UserDefinedType[_] =>
+        convertField(field.copy(dataType = udt.sqlType))
+
+      case _ =>
+        throw new AnalysisException(s"Unsupported data type $field.dataType")
+    }
+  }
+}
+
+private[parquet] object ParquetSchemaConverter {
+  val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
+
+  // !! HACK ALERT !!
+  //
+  // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing 
empty GroupType,
+  // which prevents us to avoid selecting any columns for queries like `SELECT 
COUNT(*) FROM t`.
+  // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
+  //
+  // To workaround this problem, here we first construct a `MessageType` with 
a single dummy
+  // field, and then remove the field to obtain an empty `MessageType`.
+  //
+  // TODO Reverts this change after upgrading parquet-mr to 1.8.2+
+  val EMPTY_MESSAGE = Types
+      .buildMessage()
+      .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy")
+      .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+  EMPTY_MESSAGE.getFields.clear()
+
+  def checkFieldName(name: String): Unit = {
+    // ,;{}()\n\t= and space are special characters in Parquet schema
+    checkConversionRequirement(
+      !name.matches(".*[ ,;{}()\n\t=].*"),
+      s"""Attribute name "$name" contains invalid character(s) among " 
,;{}()\\n\\t=".
+         |Please use alias to rename it.
+       """.stripMargin.split("\n").mkString(" ").trim)
+  }
+
+  def checkFieldNames(schema: StructType): StructType = {
+    schema.fieldNames.foreach(checkFieldName)
+    schema
+  }
+
+  def checkConversionRequirement(f: => Boolean, message: String): Unit = {
+    if (!f) {
+      throw new AnalysisException(message)
+    }
+  }
+
+  private def computeMinBytesForPrecision(precision : Int) : Int = {
+    var numBytes = 1
+    while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) {
+      numBytes += 1
+    }
+    numBytes
+  }
+
+  // Returns the minimum number of bytes needed to store a decimal with a 
given `precision`.
+  val minBytesForPrecision = 
Array.tabulate[Int](39)(computeMinBytesForPrecision)
+
+  // Max precision of a decimal value stored in `numBytes` bytes
+  def maxPrecisionForBytes(numBytes: Int): Int = {
+    Math.round(                               // convert double to long
+      Math.floor(Math.log10(                  // number of base-10 digits
+        Math.pow(2, 8 * numBytes - 1) - 1)))  // max value stored in numBytes
+      .asInstanceOf[Int]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 92f2db3..fc9ce6b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -362,7 +362,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       assert(fs.exists(new Path(path, 
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
       assert(fs.exists(new Path(path, 
ParquetFileWriter.PARQUET_METADATA_FILE)))
 
-      val expectedSchema = new CatalystSchemaConverter().convert(schema)
+      val expectedSchema = new ParquetSchemaConverter().convert(schema)
       val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema
 
       actualSchema.checkContains(expectedSchema)
@@ -432,7 +432,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       """.stripMargin)
 
     withTempPath { location =>
-      val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> 
sparkSchema.toString)
+      val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> 
sparkSchema.toString)
       val path = new Path(location.getCanonicalPath)
       val conf = spark.sessionState.newHadoopConf()
       writeMetadata(parquetSchema, path, conf, extraMetadata)

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index b4fd0ef..83d1001 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -574,7 +574,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
   test("expand UDT in StructType") {
     val schema = new StructType().add("n", new NestedStructUDT, nullable = 
true)
     val expected = new StructType().add("n", new NestedStructUDT().sqlType, 
nullable = true)
-    assert(CatalystReadSupport.expandUDT(schema) === expected)
+    assert(ParquetReadSupport.expandUDT(schema) === expected)
   }
 
   test("expand UDT in ArrayType") {
@@ -592,7 +592,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
         containsNull = false),
       nullable = true)
 
-    assert(CatalystReadSupport.expandUDT(schema) === expected)
+    assert(ParquetReadSupport.expandUDT(schema) === expected)
   }
 
   test("expand UDT in MapType") {
@@ -612,7 +612,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
         valueContainsNull = false),
       nullable = true)
 
-    assert(CatalystReadSupport.expandUDT(schema) === expected)
+    assert(ParquetReadSupport.expandUDT(schema) === expected)
   }
 
   test("returning batch for wide table") {

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 865bfa2..8a980a7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -54,7 +54,7 @@ abstract class ParquetSchemaTest extends ParquetTest with 
SharedSQLContext {
       binaryAsString: Boolean,
       int96AsTimestamp: Boolean,
       writeLegacyParquetFormat: Boolean): Unit = {
-    val converter = new CatalystSchemaConverter(
+    val converter = new ParquetSchemaConverter(
       assumeBinaryIsString = binaryAsString,
       assumeInt96IsTimestamp = int96AsTimestamp,
       writeLegacyParquetFormat = writeLegacyParquetFormat)
@@ -78,7 +78,7 @@ abstract class ParquetSchemaTest extends ParquetTest with 
SharedSQLContext {
       binaryAsString: Boolean,
       int96AsTimestamp: Boolean,
       writeLegacyParquetFormat: Boolean): Unit = {
-    val converter = new CatalystSchemaConverter(
+    val converter = new ParquetSchemaConverter(
       assumeBinaryIsString = binaryAsString,
       assumeInt96IsTimestamp = int96AsTimestamp,
       writeLegacyParquetFormat = writeLegacyParquetFormat)
@@ -1062,7 +1062,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       catalystSchema: StructType,
       expectedSchema: MessageType): Unit = {
     test(s"Clipping - $testName") {
-      val actual = CatalystReadSupport.clipParquetSchema(
+      val actual = ParquetReadSupport.clipParquetSchema(
         MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
 
       try {
@@ -1424,7 +1424,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
 
     catalystSchema = new StructType(),
 
-    expectedSchema = CatalystSchemaConverter.EMPTY_MESSAGE)
+    expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE)
 
   testSchemaClipping(
     "disjoint field sets",

http://git-wip-us.apache.org/repos/asf/spark/blob/865e7cc3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 1953d6f..9fb34e0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -124,8 +124,8 @@ private[sql] trait ParquetTest extends SQLTestUtils {
 
   protected def writeMetadata(
       schema: StructType, path: Path, configuration: Configuration): Unit = {
-    val parquetSchema = new CatalystSchemaConverter().convert(schema)
-    val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> 
schema.json).asJava
+    val parquetSchema = new ParquetSchemaConverter().convert(schema)
+    val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> 
schema.json).asJava
     val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}"
     val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, 
createdBy)
     val parquetMetadata = new ParquetMetadata(fileMetadata, 
Seq.empty[BlockMetaData].asJava)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to