Repository: spark
Updated Branches:
  refs/heads/branch-1.0 ff4d5a234 -> 357d16bcd


http://git-wip-us.apache.org/repos/asf/spark/blob/357d16bc/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
new file mode 100644
index 0000000..f904636
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.io.IOException
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.mapreduce.Job
+
+import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter}
+import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData}
+import parquet.hadoop.util.ContextUtil
+import parquet.schema.{Type => ParquetType, PrimitiveType => 
ParquetPrimitiveType, MessageType}
+import parquet.schema.{GroupType => ParquetGroupType, OriginalType => 
ParquetOriginalType, ConversionPatterns}
+import parquet.schema.PrimitiveType.{PrimitiveTypeName => 
ParquetPrimitiveTypeName}
+import parquet.schema.Type.Repetition
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Attribute}
+import org.apache.spark.sql.catalyst.types._
+
+// Implicits
+import scala.collection.JavaConversions._
+
+private[parquet] object ParquetTypesConverter extends Logging {
+  def isPrimitiveType(ctype: DataType): Boolean =
+    classOf[PrimitiveType] isAssignableFrom ctype.getClass
+
+  def toPrimitiveDataType(parquetType : ParquetPrimitiveTypeName): DataType = 
parquetType match {
+    case ParquetPrimitiveTypeName.BINARY => StringType
+    case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
+    case ParquetPrimitiveTypeName.DOUBLE => DoubleType
+    case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => ArrayType(ByteType)
+    case ParquetPrimitiveTypeName.FLOAT => FloatType
+    case ParquetPrimitiveTypeName.INT32 => IntegerType
+    case ParquetPrimitiveTypeName.INT64 => LongType
+    case ParquetPrimitiveTypeName.INT96 =>
+      // TODO: add BigInteger type? TODO(andre) use DecimalType instead????
+      sys.error("Potential loss of precision: cannot convert INT96")
+    case _ => sys.error(
+      s"Unsupported parquet datatype $parquetType")
+  }
+
+  /**
+   * Converts a given Parquet `Type` into the corresponding
+   * [[org.apache.spark.sql.catalyst.types.DataType]].
+   *
+   * We apply the following conversion rules:
+   * <ul>
+   *   <li> Primitive types are converter to the corresponding primitive 
type.</li>
+   *   <li> Group types that have a single field that is itself a group, which 
has repetition
+   *        level `REPEATED`, are treated as follows:<ul>
+   *          <li> If the nested group has name `values`, the surrounding 
group is converted
+   *               into an [[ArrayType]] with the corresponding field type 
(primitive or
+   *               complex) as element type.</li>
+   *          <li> If the nested group has name `map` and two fields (named 
`key` and `value`),
+   *               the surrounding group is converted into a [[MapType]]
+   *               with the corresponding key and value (value possibly 
complex) types.
+   *               Note that we currently assume map values are not 
nullable.</li>
+   *   <li> Other group types are converted into a [[StructType]] with the 
corresponding
+   *        field types.</li></ul></li>
+   * </ul>
+   * Note that fields are determined to be `nullable` if and only if their 
Parquet repetition
+   * level is not `REQUIRED`.
+   *
+   * @param parquetType The type to convert.
+   * @return The corresponding Catalyst type.
+   */
+  def toDataType(parquetType: ParquetType): DataType = {
+    def correspondsToMap(groupType: ParquetGroupType): Boolean = {
+      if (groupType.getFieldCount != 1 || 
groupType.getFields.apply(0).isPrimitive) {
+        false
+      } else {
+        // This mostly follows the convention in 
``parquet.schema.ConversionPatterns``
+        val keyValueGroup = groupType.getFields.apply(0).asGroupType()
+        keyValueGroup.getRepetition == Repetition.REPEATED &&
+          keyValueGroup.getName == CatalystConverter.MAP_SCHEMA_NAME &&
+          keyValueGroup.getFieldCount == 2 &&
+          keyValueGroup.getFields.apply(0).getName == 
CatalystConverter.MAP_KEY_SCHEMA_NAME &&
+          keyValueGroup.getFields.apply(1).getName == 
CatalystConverter.MAP_VALUE_SCHEMA_NAME
+      }
+    }
+
+    def correspondsToArray(groupType: ParquetGroupType): Boolean = {
+      groupType.getFieldCount == 1 &&
+        groupType.getFieldName(0) == 
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME &&
+        groupType.getFields.apply(0).getRepetition == Repetition.REPEATED
+    }
+
+    if (parquetType.isPrimitive) {
+      toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName)
+    } else {
+      val groupType = parquetType.asGroupType()
+      parquetType.getOriginalType match {
+        // if the schema was constructed programmatically there may be hints 
how to convert
+        // it inside the metadata via the OriginalType field
+        case ParquetOriginalType.LIST => { // TODO: check enums!
+          assert(groupType.getFieldCount == 1)
+          val field = groupType.getFields.apply(0)
+          new ArrayType(toDataType(field))
+        }
+        case ParquetOriginalType.MAP => {
+          assert(
+            !groupType.getFields.apply(0).isPrimitive,
+            "Parquet Map type malformatted: expected nested group for map!")
+          val keyValueGroup = groupType.getFields.apply(0).asGroupType()
+          assert(
+            keyValueGroup.getFieldCount == 2,
+            "Parquet Map type malformatted: nested group should have 2 (key, 
value) fields!")
+          val keyType = toDataType(keyValueGroup.getFields.apply(0))
+          assert(keyValueGroup.getFields.apply(0).getRepetition == 
Repetition.REQUIRED)
+          val valueType = toDataType(keyValueGroup.getFields.apply(1))
+          assert(keyValueGroup.getFields.apply(1).getRepetition == 
Repetition.REQUIRED)
+          new MapType(keyType, valueType)
+        }
+        case _ => {
+          // Note: the order of these checks is important!
+          if (correspondsToMap(groupType)) { // MapType
+            val keyValueGroup = groupType.getFields.apply(0).asGroupType()
+            val keyType = toDataType(keyValueGroup.getFields.apply(0))
+            assert(keyValueGroup.getFields.apply(0).getRepetition == 
Repetition.REQUIRED)
+            val valueType = toDataType(keyValueGroup.getFields.apply(1))
+            assert(keyValueGroup.getFields.apply(1).getRepetition == 
Repetition.REQUIRED)
+            new MapType(keyType, valueType)
+          } else if (correspondsToArray(groupType)) { // ArrayType
+            val elementType = toDataType(groupType.getFields.apply(0))
+            new ArrayType(elementType)
+          } else { // everything else: StructType
+            val fields = groupType
+              .getFields
+              .map(ptype => new StructField(
+              ptype.getName,
+              toDataType(ptype),
+              ptype.getRepetition != Repetition.REQUIRED))
+            new StructType(fields)
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * For a given Catalyst [[org.apache.spark.sql.catalyst.types.DataType]] 
return
+   * the name of the corresponding Parquet primitive type or None if the given 
type
+   * is not primitive.
+   *
+   * @param ctype The type to convert
+   * @return The name of the corresponding Parquet primitive type
+   */
+  def fromPrimitiveDataType(ctype: DataType):
+      Option[ParquetPrimitiveTypeName] = ctype match {
+    case StringType => Some(ParquetPrimitiveTypeName.BINARY)
+    case BooleanType => Some(ParquetPrimitiveTypeName.BOOLEAN)
+    case DoubleType => Some(ParquetPrimitiveTypeName.DOUBLE)
+    case ArrayType(ByteType) =>
+      Some(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+    case FloatType => Some(ParquetPrimitiveTypeName.FLOAT)
+    case IntegerType => Some(ParquetPrimitiveTypeName.INT32)
+    // There is no type for Byte or Short so we promote them to INT32.
+    case ShortType => Some(ParquetPrimitiveTypeName.INT32)
+    case ByteType => Some(ParquetPrimitiveTypeName.INT32)
+    case LongType => Some(ParquetPrimitiveTypeName.INT64)
+    case _ => None
+  }
+
+  /**
+   * Converts a given Catalyst 
[[org.apache.spark.sql.catalyst.types.DataType]] into
+   * the corresponding Parquet `Type`.
+   *
+   * The conversion follows the rules below:
+   * <ul>
+   *   <li> Primitive types are converted into Parquet's primitive types.</li>
+   *   <li> [[org.apache.spark.sql.catalyst.types.StructType]]s are converted
+   *        into Parquet's `GroupType` with the corresponding field types.</li>
+   *   <li> [[org.apache.spark.sql.catalyst.types.ArrayType]]s are converted
+   *        into a 2-level nested group, where the outer group has the inner
+   *        group as sole field. The inner group has name `values` and
+   *        repetition level `REPEATED` and has the element type of
+   *        the array as schema. We use Parquet's `ConversionPatterns` for this
+   *        purpose.</li>
+   *   <li> [[org.apache.spark.sql.catalyst.types.MapType]]s are converted
+   *        into a nested (2-level) Parquet `GroupType` with two fields: a key
+   *        type and a value type. The nested group has repetition level
+   *        `REPEATED` and name `map`. We use Parquet's `ConversionPatterns`
+   *        for this purpose</li>
+   * </ul>
+   * Parquet's repetition level is generally set according to the following 
rule:
+   * <ul>
+   *   <li> If the call to `fromDataType` is recursive inside an enclosing 
`ArrayType` or
+   *   `MapType`, then the repetition level is set to `REPEATED`.</li>
+   *   <li> Otherwise, if the attribute whose type is converted is `nullable`, 
the Parquet
+   *   type gets repetition level `OPTIONAL` and otherwise `REQUIRED`.</li>
+   * </ul>
+   *
+   *@param ctype The type to convert
+   * @param name The name of the 
[[org.apache.spark.sql.catalyst.expressions.Attribute]]
+   *             whose type is converted
+   * @param nullable When true indicates that the attribute is nullable
+   * @param inArray When true indicates that this is a nested attribute inside 
an array.
+   * @return The corresponding Parquet type.
+   */
+  def fromDataType(
+      ctype: DataType,
+      name: String,
+      nullable: Boolean = true,
+      inArray: Boolean = false): ParquetType = {
+    val repetition =
+      if (inArray) {
+        Repetition.REPEATED
+      } else {
+        if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED
+      }
+    val primitiveType = fromPrimitiveDataType(ctype)
+    if (primitiveType.isDefined) {
+      new ParquetPrimitiveType(repetition, primitiveType.get, name)
+    } else {
+      ctype match {
+        case ArrayType(elementType) => {
+          val parquetElementType = fromDataType(
+            elementType,
+            CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+            nullable = false,
+            inArray = true)
+            ConversionPatterns.listType(repetition, name, parquetElementType)
+        }
+        case StructType(structFields) => {
+          val fields = structFields.map {
+            field => fromDataType(field.dataType, field.name, field.nullable, 
inArray = false)
+          }
+          new ParquetGroupType(repetition, name, fields)
+        }
+        case MapType(keyType, valueType) => {
+          val parquetKeyType =
+            fromDataType(
+              keyType,
+              CatalystConverter.MAP_KEY_SCHEMA_NAME,
+              nullable = false,
+              inArray = false)
+          val parquetValueType =
+            fromDataType(
+              valueType,
+              CatalystConverter.MAP_VALUE_SCHEMA_NAME,
+              nullable = false,
+              inArray = false)
+          ConversionPatterns.mapType(
+            repetition,
+            name,
+            parquetKeyType,
+            parquetValueType)
+        }
+        case _ => sys.error(s"Unsupported datatype $ctype")
+      }
+    }
+  }
+
+  def convertToAttributes(parquetSchema: ParquetType): Seq[Attribute] = {
+    parquetSchema
+      .asGroupType()
+      .getFields
+      .map(
+        field =>
+          new AttributeReference(
+            field.getName,
+            toDataType(field),
+            field.getRepetition != Repetition.REQUIRED)())
+  }
+
+  def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
+    val fields = attributes.map(
+      attribute =>
+        fromDataType(attribute.dataType, attribute.name, attribute.nullable))
+    new MessageType("root", fields)
+  }
+
+  def convertFromString(string: String): Seq[Attribute] = {
+    DataType(string) match {
+      case s: StructType => s.toAttributes
+      case other => sys.error(s"Can convert $string to row")
+    }
+  }
+
+  def convertToString(schema: Seq[Attribute]): String = {
+    StructType.fromAttributes(schema).toString
+  }
+
+  def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: 
Configuration): Unit = {
+    if (origPath == null) {
+      throw new IllegalArgumentException("Unable to write Parquet metadata: 
path is null")
+    }
+    val fs = origPath.getFileSystem(conf)
+    if (fs == null) {
+      throw new IllegalArgumentException(
+        s"Unable to write Parquet metadata: path $origPath is incorrectly 
formatted")
+    }
+    val path = origPath.makeQualified(fs)
+    if (fs.exists(path) && !fs.getFileStatus(path).isDir) {
+      throw new IllegalArgumentException(s"Expected to write to directory 
$path but found file")
+    }
+    val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
+    if (fs.exists(metadataPath)) {
+      try {
+        fs.delete(metadataPath, true)
+      } catch {
+        case e: IOException =>
+          throw new IOException(s"Unable to delete previous 
PARQUET_METADATA_FILE at $metadataPath")
+      }
+    }
+    val extraMetadata = new java.util.HashMap[String, String]()
+    extraMetadata.put(
+      RowReadSupport.SPARK_METADATA_KEY,
+      ParquetTypesConverter.convertToString(attributes))
+    // TODO: add extra data, e.g., table name, date, etc.?
+
+    val parquetSchema: MessageType =
+      ParquetTypesConverter.convertFromAttributes(attributes)
+    val metaData: FileMetaData = new FileMetaData(
+      parquetSchema,
+      extraMetadata,
+      "Spark")
+
+    ParquetRelation.enableLogForwarding()
+    ParquetFileWriter.writeMetadataFile(
+      conf,
+      path,
+      new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil)
+  }
+
+  /**
+   * Try to read Parquet metadata at the given Path. We first see if there is 
a summary file
+   * in the parent directory. If so, this is used. Else we read the actual 
footer at the given
+   * location.
+   * @param origPath The path at which we expect one (or more) Parquet files.
+   * @param configuration The Hadoop configuration to use.
+   * @return The `ParquetMetadata` containing among other things the schema.
+   */
+  def readMetaData(origPath: Path, configuration: Option[Configuration]): 
ParquetMetadata = {
+    if (origPath == null) {
+      throw new IllegalArgumentException("Unable to read Parquet metadata: 
path is null")
+    }
+    val job = new Job()
+    val conf = configuration.getOrElse(ContextUtil.getConfiguration(job))
+    val fs: FileSystem = origPath.getFileSystem(conf)
+    if (fs == null) {
+      throw new IllegalArgumentException(s"Incorrectly formatted Parquet 
metadata path $origPath")
+    }
+    val path = origPath.makeQualified(fs)
+    if (!fs.getFileStatus(path).isDir) {
+      throw new IllegalArgumentException(
+        s"Expected $path for be a directory with Parquet files/metadata")
+    }
+    ParquetRelation.enableLogForwarding()
+    val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
+    // if this is a new table that was just created we will find only the 
metadata file
+    if (fs.exists(metadataPath) && fs.isFile(metadataPath)) {
+      ParquetFileReader.readFooter(conf, metadataPath)
+    } else {
+      // there may be one or more Parquet files in the given directory
+      val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path))
+      // TODO: for now we assume that all footers (if there is more than one) 
have identical
+      // metadata; we may want to add a check here at some point
+      if (footers.size() == 0) {
+        throw new IllegalArgumentException(s"Could not find Parquet metadata 
at path $path")
+      }
+      footers(0).getParquetMetadata
+    }
+  }
+
+  /**
+   * Reads in Parquet Metadata from the given path and tries to extract the 
schema
+   * (Catalyst attributes) from the application-specific key-value map. If this
+   * is empty it falls back to converting from the Parquet file schema which
+   * may lead to an upcast of types (e.g., {byte, short} to int).
+   *
+   * @param origPath The path at which we expect one (or more) Parquet files.
+   * @param conf The Hadoop configuration to use.
+   * @return A list of attributes that make up the schema.
+   */
+  def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): 
Seq[Attribute] = {
+    val keyValueMetadata: java.util.Map[String, String] =
+      readMetaData(origPath, conf)
+        .getFileMetaData
+        .getKeyValueMetaData
+    if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) {
+      
convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
+    } else {
+      val attributes = convertToAttributes(
+        readMetaData(origPath, conf).getFileMetaData.getSchema)
+      log.warn(s"Falling back to schema conversion from Parquet types; result: 
$attributes")
+      attributes
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/357d16bc/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 9810520..0c239d0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -26,15 +26,16 @@ import parquet.hadoop.ParquetFileWriter
 import parquet.hadoop.util.ContextUtil
 import parquet.schema.MessageTypeParser
 
+import org.apache.spark.SparkContext
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.util.getTempFilePath
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType}
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.TestData
 import org.apache.spark.sql.SchemaRDD
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.expressions.Equals
-import org.apache.spark.sql.catalyst.types.IntegerType
+import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, Star}
 import org.apache.spark.util.Utils
 
 // Implicits
@@ -56,15 +57,37 @@ case class OptionalReflectData(
     doubleField: Option[Double],
     booleanField: Option[Boolean])
 
+case class Nested(i: Int, s: String)
+
+case class Data(array: Seq[Int], nested: Nested)
+
+case class AllDataTypes(
+    stringField: String,
+    intField: Int,
+    longField: Long,
+    floatField: Float,
+    doubleField: Double,
+    shortField: Short,
+    byteField: Byte,
+    booleanField: Boolean)
+
 class ParquetQuerySuite extends QueryTest with FunSuiteLike with 
BeforeAndAfterAll {
   import TestData._
   TestData // Load test data tables.
 
   var testRDD: SchemaRDD = null
 
+  // TODO: remove this once SqlParser can parse nested select statements
+  var nestedParserSqlContext: NestedParserSQLContext = null
+
   override def beforeAll() {
+    nestedParserSqlContext = new 
NestedParserSQLContext(TestSQLContext.sparkContext)
     ParquetTestData.writeFile()
     ParquetTestData.writeFilterFile()
+    ParquetTestData.writeNestedFile1()
+    ParquetTestData.writeNestedFile2()
+    ParquetTestData.writeNestedFile3()
+    ParquetTestData.writeNestedFile4()
     testRDD = parquetFile(ParquetTestData.testDir.toString)
     testRDD.registerAsTable("testsource")
     parquetFile(ParquetTestData.testFilterDir.toString)
@@ -74,9 +97,33 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike 
with BeforeAndAfterA
   override def afterAll() {
     Utils.deleteRecursively(ParquetTestData.testDir)
     Utils.deleteRecursively(ParquetTestData.testFilterDir)
+    Utils.deleteRecursively(ParquetTestData.testNestedDir1)
+    Utils.deleteRecursively(ParquetTestData.testNestedDir2)
+    Utils.deleteRecursively(ParquetTestData.testNestedDir3)
+    Utils.deleteRecursively(ParquetTestData.testNestedDir4)
     // here we should also unregister the table??
   }
 
+  test("Read/Write All Types") {
+    val tempDir = getTempFilePath("parquetTest").getCanonicalPath
+    val range = (0 to 255)
+    TestSQLContext.sparkContext.parallelize(range)
+      .map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, 
x.toShort, x.toByte, x % 2 == 0))
+      .saveAsParquetFile(tempDir)
+    val result = parquetFile(tempDir).collect()
+    range.foreach {
+      i =>
+        assert(result(i).getString(0) == s"$i", s"row $i String field did not 
match, got ${result(i).getString(0)}")
+        assert(result(i).getInt(1) === i)
+        assert(result(i).getLong(2) === i.toLong)
+        assert(result(i).getFloat(3) === i.toFloat)
+        assert(result(i).getDouble(4) === i.toDouble)
+        assert(result(i).getShort(5) === i.toShort)
+        assert(result(i).getByte(6) === i.toByte)
+        assert(result(i).getBoolean(7) === (i % 2 == 0))
+    }
+  }
+
   test("self-join parquet files") {
     val x = ParquetTestData.testData.as('x)
     val y = ParquetTestData.testData.as('y)
@@ -154,7 +201,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike 
with BeforeAndAfterA
       path,
       TestSQLContext.sparkContext.hadoopConfiguration)
     assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
-    val metaData = ParquetTypesConverter.readMetaData(path)
+    val metaData = ParquetTypesConverter.readMetaData(path, 
Some(ContextUtil.getConfiguration(job)))
     assert(metaData != null)
     ParquetTestData
       .testData
@@ -197,10 +244,37 @@ class ParquetQuerySuite extends QueryTest with 
FunSuiteLike with BeforeAndAfterA
       assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i")
     }
     Utils.deleteRecursively(file)
-    assert(true)
   }
 
-  test("insert (appending) to same table via Scala API") {
+  test("Insert (overwrite) via Scala API") {
+    val dirname = Utils.createTempDir()
+    val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+      .map(i => TestRDDEntry(i, s"val_$i"))
+    source_rdd.registerAsTable("source")
+    val dest_rdd = createParquetFile[TestRDDEntry](dirname.toString)
+    dest_rdd.registerAsTable("dest")
+    sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
+    val rdd_copy1 = sql("SELECT * FROM dest").collect()
+    assert(rdd_copy1.size === 100)
+    assert(rdd_copy1(0).apply(0) === 1)
+    assert(rdd_copy1(0).apply(1) === "val_1")
+    // TODO: why does collecting break things? It seems 
InsertIntoParquet::execute() is
+    // executed twice otherwise?!
+    sql("INSERT INTO dest SELECT * FROM source")
+    val rdd_copy2 = sql("SELECT * FROM dest").collect()
+    assert(rdd_copy2.size === 200)
+    assert(rdd_copy2(0).apply(0) === 1)
+    assert(rdd_copy2(0).apply(1) === "val_1")
+    assert(rdd_copy2(99).apply(0) === 100)
+    assert(rdd_copy2(99).apply(1) === "val_100")
+    assert(rdd_copy2(100).apply(0) === 1)
+    assert(rdd_copy2(100).apply(1) === "val_1")
+    Utils.deleteRecursively(dirname)
+  }
+
+  test("Insert (appending) to same table via Scala API") {
+    // TODO: why does collecting break things? It seems 
InsertIntoParquet::execute() is
+    // executed twice otherwise?!
     sql("INSERT INTO testsource SELECT * FROM testsource")
     val double_rdd = sql("SELECT * FROM testsource").collect()
     assert(double_rdd != null)
@@ -363,4 +437,272 @@ class ParquetQuerySuite extends QueryTest with 
FunSuiteLike with BeforeAndAfterA
     val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10")
     assert(query.collect().size === 10)
   }
+
+  test("Importing nested Parquet file (Addressbook)") {
+    val result = TestSQLContext
+      .parquetFile(ParquetTestData.testNestedDir1.toString)
+      .toSchemaRDD
+      .collect()
+    assert(result != null)
+    assert(result.size === 2)
+    val first_record = result(0)
+    val second_record = result(1)
+    assert(first_record != null)
+    assert(second_record != null)
+    assert(first_record.size === 3)
+    assert(second_record(1) === null)
+    assert(second_record(2) === null)
+    assert(second_record(0) === "A. Nonymous")
+    assert(first_record(0) === "Julien Le Dem")
+    val first_owner_numbers = first_record(1)
+      .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+    val first_contacts = first_record(2)
+      .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+    assert(first_owner_numbers != null)
+    assert(first_owner_numbers(0) === "555 123 4567")
+    assert(first_owner_numbers(2) === "XXX XXX XXXX")
+    assert(first_contacts(0)
+      .asInstanceOf[CatalystConverter.StructScalaType[_]].size === 2)
+    val first_contacts_entry_one = first_contacts(0)
+      .asInstanceOf[CatalystConverter.StructScalaType[_]]
+    assert(first_contacts_entry_one(0) === "Dmitriy Ryaboy")
+    assert(first_contacts_entry_one(1) === "555 987 6543")
+    val first_contacts_entry_two = first_contacts(1)
+      .asInstanceOf[CatalystConverter.StructScalaType[_]]
+    assert(first_contacts_entry_two(0) === "Chris Aniszczyk")
+  }
+
+  test("Importing nested Parquet file (nested numbers)") {
+    val result = TestSQLContext
+      .parquetFile(ParquetTestData.testNestedDir2.toString)
+      .toSchemaRDD
+      .collect()
+    assert(result.size === 1, "number of top-level rows incorrect")
+    assert(result(0).size === 5, "number of fields in row incorrect")
+    assert(result(0)(0) === 1)
+    assert(result(0)(1) === 7)
+    val subresult1 = 
result(0)(2).asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+    assert(subresult1.size === 3)
+    assert(subresult1(0) === (1.toLong << 32))
+    assert(subresult1(1) === (1.toLong << 33))
+    assert(subresult1(2) === (1.toLong << 34))
+    val subresult2 = result(0)(3)
+      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
+      .asInstanceOf[CatalystConverter.StructScalaType[_]]
+    assert(subresult2.size === 2)
+    assert(subresult2(0) === 2.5)
+    assert(subresult2(1) === false)
+    val subresult3 = result(0)(4)
+      .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+    assert(subresult3.size === 2)
+    
assert(subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 
2)
+    val subresult4 = 
subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+    assert(subresult4(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) 
=== 7)
+    assert(subresult4(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) 
=== 8)
+    
assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 
1)
+    assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
+      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9)
+  }
+
+  test("Simple query on addressbook") {
+    val data = TestSQLContext
+      .parquetFile(ParquetTestData.testNestedDir1.toString)
+      .toSchemaRDD
+    val tmp = data.where('owner === "Julien Le Dem").select('owner as 'a, 
'contacts as 'c).collect()
+    assert(tmp.size === 1)
+    assert(tmp(0)(0) === "Julien Le Dem")
+  }
+
+  test("Projection in addressbook") {
+    val data = nestedParserSqlContext
+      .parquetFile(ParquetTestData.testNestedDir1.toString)
+      .toSchemaRDD
+    data.registerAsTable("data")
+    val query = nestedParserSqlContext.sql("SELECT owner, contacts[1].name 
FROM data")
+    val tmp = query.collect()
+    assert(tmp.size === 2)
+    assert(tmp(0).size === 2)
+    assert(tmp(0)(0) === "Julien Le Dem")
+    assert(tmp(0)(1) === "Chris Aniszczyk")
+    assert(tmp(1)(0) === "A. Nonymous")
+    assert(tmp(1)(1) === null)
+  }
+
+  test("Simple query on nested int data") {
+    val data = nestedParserSqlContext
+      .parquetFile(ParquetTestData.testNestedDir2.toString)
+      .toSchemaRDD
+    data.registerAsTable("data")
+    val result1 = nestedParserSqlContext.sql("SELECT entries[0].value FROM 
data").collect()
+    assert(result1.size === 1)
+    assert(result1(0).size === 1)
+    assert(result1(0)(0) === 2.5)
+    val result2 = nestedParserSqlContext.sql("SELECT entries[0] FROM 
data").collect()
+    assert(result2.size === 1)
+    val subresult1 = 
result2(0)(0).asInstanceOf[CatalystConverter.StructScalaType[_]]
+    assert(subresult1.size === 2)
+    assert(subresult1(0) === 2.5)
+    assert(subresult1(1) === false)
+    val result3 = nestedParserSqlContext.sql("SELECT outerouter FROM 
data").collect()
+    val subresult2 = result3(0)(0)
+      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
+      .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
+    assert(subresult2(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) 
=== 7)
+    assert(subresult2(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) 
=== 8)
+    assert(result3(0)(0)
+      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](1)
+      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
+      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9)
+  }
+
+  test("nested structs") {
+    val data = nestedParserSqlContext
+      .parquetFile(ParquetTestData.testNestedDir3.toString)
+      .toSchemaRDD
+    data.registerAsTable("data")
+    val result1 = nestedParserSqlContext.sql("SELECT 
booleanNumberPairs[0].value[0].truth FROM data").collect()
+    assert(result1.size === 1)
+    assert(result1(0).size === 1)
+    assert(result1(0)(0) === false)
+    val result2 = nestedParserSqlContext.sql("SELECT 
booleanNumberPairs[0].value[1].truth FROM data").collect()
+    assert(result2.size === 1)
+    assert(result2(0).size === 1)
+    assert(result2(0)(0) === true)
+    val result3 = nestedParserSqlContext.sql("SELECT 
booleanNumberPairs[1].value[0].truth FROM data").collect()
+    assert(result3.size === 1)
+    assert(result3(0).size === 1)
+    assert(result3(0)(0) === false)
+  }
+
+  test("simple map") {
+    val data = TestSQLContext
+      .parquetFile(ParquetTestData.testNestedDir4.toString)
+      .toSchemaRDD
+    data.registerAsTable("mapTable")
+    val result1 = sql("SELECT data1 FROM mapTable").collect()
+    assert(result1.size === 1)
+    assert(result1(0)(0)
+      .asInstanceOf[CatalystConverter.MapScalaType[String, _]]
+      .getOrElse("key1", 0) === 1)
+    assert(result1(0)(0)
+      .asInstanceOf[CatalystConverter.MapScalaType[String, _]]
+      .getOrElse("key2", 0) === 2)
+    val result2 = sql("""SELECT data1["key1"] FROM mapTable""").collect()
+    assert(result2(0)(0) === 1)
+  }
+
+  test("map with struct values") {
+    val data = nestedParserSqlContext
+      .parquetFile(ParquetTestData.testNestedDir4.toString)
+      .toSchemaRDD
+    data.registerAsTable("mapTable")
+    val result1 = nestedParserSqlContext.sql("SELECT data2 FROM 
mapTable").collect()
+    assert(result1.size === 1)
+    val entry1 = result1(0)(0)
+      .asInstanceOf[CatalystConverter.MapScalaType[String, 
CatalystConverter.StructScalaType[_]]]
+      .getOrElse("seven", null)
+    assert(entry1 != null)
+    assert(entry1(0) === 42)
+    assert(entry1(1) === "the answer")
+    val entry2 = result1(0)(0)
+      .asInstanceOf[CatalystConverter.MapScalaType[String, 
CatalystConverter.StructScalaType[_]]]
+      .getOrElse("eight", null)
+    assert(entry2 != null)
+    assert(entry2(0) === 49)
+    assert(entry2(1) === null)
+    val result2 = nestedParserSqlContext.sql("""SELECT 
data2["seven"].payload1, data2["seven"].payload2 FROM mapTable""").collect()
+    assert(result2.size === 1)
+    assert(result2(0)(0) === 42.toLong)
+    assert(result2(0)(1) === "the answer")
+  }
+
+  test("Writing out Addressbook and reading it back in") {
+    // TODO: find out why CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME
+    // has no effect in this test case
+    val tmpdir = Utils.createTempDir()
+    Utils.deleteRecursively(tmpdir)
+    val result = nestedParserSqlContext
+      .parquetFile(ParquetTestData.testNestedDir1.toString)
+      .toSchemaRDD
+    result.saveAsParquetFile(tmpdir.toString)
+    nestedParserSqlContext
+      .parquetFile(tmpdir.toString)
+      .toSchemaRDD
+      .registerAsTable("tmpcopy")
+    val tmpdata = nestedParserSqlContext.sql("SELECT owner, contacts[1].name 
FROM tmpcopy").collect()
+    assert(tmpdata.size === 2)
+    assert(tmpdata(0).size === 2)
+    assert(tmpdata(0)(0) === "Julien Le Dem")
+    assert(tmpdata(0)(1) === "Chris Aniszczyk")
+    assert(tmpdata(1)(0) === "A. Nonymous")
+    assert(tmpdata(1)(1) === null)
+    Utils.deleteRecursively(tmpdir)
+  }
+
+  test("Writing out Map and reading it back in") {
+    val data = nestedParserSqlContext
+      .parquetFile(ParquetTestData.testNestedDir4.toString)
+      .toSchemaRDD
+    val tmpdir = Utils.createTempDir()
+    Utils.deleteRecursively(tmpdir)
+    data.saveAsParquetFile(tmpdir.toString)
+    nestedParserSqlContext
+      .parquetFile(tmpdir.toString)
+      .toSchemaRDD
+      .registerAsTable("tmpmapcopy")
+    val result1 = nestedParserSqlContext.sql("""SELECT data1["key2"] FROM 
tmpmapcopy""").collect()
+    assert(result1.size === 1)
+    assert(result1(0)(0) === 2)
+    val result2 = nestedParserSqlContext.sql("SELECT data2 FROM 
tmpmapcopy").collect()
+    assert(result2.size === 1)
+    val entry1 = result2(0)(0)
+      .asInstanceOf[CatalystConverter.MapScalaType[String, 
CatalystConverter.StructScalaType[_]]]
+      .getOrElse("seven", null)
+    assert(entry1 != null)
+    assert(entry1(0) === 42)
+    assert(entry1(1) === "the answer")
+    val entry2 = result2(0)(0)
+      .asInstanceOf[CatalystConverter.MapScalaType[String, 
CatalystConverter.StructScalaType[_]]]
+      .getOrElse("eight", null)
+    assert(entry2 != null)
+    assert(entry2(0) === 49)
+    assert(entry2(1) === null)
+    val result3 = nestedParserSqlContext.sql("""SELECT 
data2["seven"].payload1, data2["seven"].payload2 FROM tmpmapcopy""").collect()
+    assert(result3.size === 1)
+    assert(result3(0)(0) === 42.toLong)
+    assert(result3(0)(1) === "the answer")
+    Utils.deleteRecursively(tmpdir)
+  }
+}
+
+// TODO: the code below is needed temporarily until the standard parser is 
able to parse
+// nested field expressions correctly
+class NestedParserSQLContext(@transient override val sparkContext: 
SparkContext) extends SQLContext(sparkContext) {
+  override protected[sql] val parser = new NestedSqlParser()
+}
+
+class NestedSqlLexical(override val keywords: Seq[String]) extends 
SqlLexical(keywords) {
+  override def identChar = letter | elem('_')
+  delimiters += (".")
+}
+
+class NestedSqlParser extends SqlParser {
+  override val lexical = new NestedSqlLexical(reservedWords)
+
+  override protected lazy val baseExpression: PackratParser[Expression] =
+    expression ~ "[" ~ expression <~ "]" ^^ {
+      case base ~ _ ~ ordinal => GetItem(base, ordinal)
+    } |
+    expression ~ "." ~ ident ^^ {
+      case base ~ _ ~ fieldName => GetField(base, fieldName)
+    } |
+    TRUE ^^^ Literal(true, BooleanType) |
+    FALSE ^^^ Literal(false, BooleanType) |
+    cast |
+    "(" ~> expression <~ ")" |
+    function |
+    "-" ~> literal ^^ UnaryMinus |
+    ident ^^ UnresolvedAttribute |
+    "*" ^^^ Star(None) |
+    literal
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/357d16bc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6828434..f923d68 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -208,7 +208,9 @@ object HiveMetastoreTypes extends RegexParsers {
     }
 
   protected lazy val structType: Parser[DataType] =
-    "struct" ~> "<" ~> repsep(structField,",") <~ ">" ^^ StructType
+    "struct" ~> "<" ~> repsep(structField,",") <~ ">"  ^^ {
+      case fields => new StructType(fields)
+    }
 
   protected lazy val dataType: Parser[DataType] =
     arrayType |

Reply via email to