Repository: spark Updated Branches: refs/heads/master f397e92eb -> f479cf374
http://git-wip-us.apache.org/repos/asf/spark/blob/f479cf37/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala new file mode 100644 index 0000000..f904636 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import java.io.IOException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.mapreduce.Job + +import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter} +import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData} +import parquet.hadoop.util.ContextUtil +import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveType, MessageType} +import parquet.schema.{GroupType => ParquetGroupType, OriginalType => ParquetOriginalType, ConversionPatterns} +import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName} +import parquet.schema.Type.Repetition + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute} +import org.apache.spark.sql.catalyst.types._ + +// Implicits +import scala.collection.JavaConversions._ + +private[parquet] object ParquetTypesConverter extends Logging { + def isPrimitiveType(ctype: DataType): Boolean = + classOf[PrimitiveType] isAssignableFrom ctype.getClass + + def toPrimitiveDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match { + case ParquetPrimitiveTypeName.BINARY => StringType + case ParquetPrimitiveTypeName.BOOLEAN => BooleanType + case ParquetPrimitiveTypeName.DOUBLE => DoubleType + case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => ArrayType(ByteType) + case ParquetPrimitiveTypeName.FLOAT => FloatType + case ParquetPrimitiveTypeName.INT32 => IntegerType + case ParquetPrimitiveTypeName.INT64 => LongType + case ParquetPrimitiveTypeName.INT96 => + // TODO: add BigInteger type? TODO(andre) use DecimalType instead???? + sys.error("Potential loss of precision: cannot convert INT96") + case _ => sys.error( + s"Unsupported parquet datatype $parquetType") + } + + /** + * Converts a given Parquet `Type` into the corresponding + * [[org.apache.spark.sql.catalyst.types.DataType]]. + * + * We apply the following conversion rules: + * <ul> + * <li> Primitive types are converter to the corresponding primitive type.</li> + * <li> Group types that have a single field that is itself a group, which has repetition + * level `REPEATED`, are treated as follows:<ul> + * <li> If the nested group has name `values`, the surrounding group is converted + * into an [[ArrayType]] with the corresponding field type (primitive or + * complex) as element type.</li> + * <li> If the nested group has name `map` and two fields (named `key` and `value`), + * the surrounding group is converted into a [[MapType]] + * with the corresponding key and value (value possibly complex) types. + * Note that we currently assume map values are not nullable.</li> + * <li> Other group types are converted into a [[StructType]] with the corresponding + * field types.</li></ul></li> + * </ul> + * Note that fields are determined to be `nullable` if and only if their Parquet repetition + * level is not `REQUIRED`. + * + * @param parquetType The type to convert. + * @return The corresponding Catalyst type. + */ + def toDataType(parquetType: ParquetType): DataType = { + def correspondsToMap(groupType: ParquetGroupType): Boolean = { + if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) { + false + } else { + // This mostly follows the convention in ``parquet.schema.ConversionPatterns`` + val keyValueGroup = groupType.getFields.apply(0).asGroupType() + keyValueGroup.getRepetition == Repetition.REPEATED && + keyValueGroup.getName == CatalystConverter.MAP_SCHEMA_NAME && + keyValueGroup.getFieldCount == 2 && + keyValueGroup.getFields.apply(0).getName == CatalystConverter.MAP_KEY_SCHEMA_NAME && + keyValueGroup.getFields.apply(1).getName == CatalystConverter.MAP_VALUE_SCHEMA_NAME + } + } + + def correspondsToArray(groupType: ParquetGroupType): Boolean = { + groupType.getFieldCount == 1 && + groupType.getFieldName(0) == CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME && + groupType.getFields.apply(0).getRepetition == Repetition.REPEATED + } + + if (parquetType.isPrimitive) { + toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName) + } else { + val groupType = parquetType.asGroupType() + parquetType.getOriginalType match { + // if the schema was constructed programmatically there may be hints how to convert + // it inside the metadata via the OriginalType field + case ParquetOriginalType.LIST => { // TODO: check enums! + assert(groupType.getFieldCount == 1) + val field = groupType.getFields.apply(0) + new ArrayType(toDataType(field)) + } + case ParquetOriginalType.MAP => { + assert( + !groupType.getFields.apply(0).isPrimitive, + "Parquet Map type malformatted: expected nested group for map!") + val keyValueGroup = groupType.getFields.apply(0).asGroupType() + assert( + keyValueGroup.getFieldCount == 2, + "Parquet Map type malformatted: nested group should have 2 (key, value) fields!") + val keyType = toDataType(keyValueGroup.getFields.apply(0)) + assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) + val valueType = toDataType(keyValueGroup.getFields.apply(1)) + assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED) + new MapType(keyType, valueType) + } + case _ => { + // Note: the order of these checks is important! + if (correspondsToMap(groupType)) { // MapType + val keyValueGroup = groupType.getFields.apply(0).asGroupType() + val keyType = toDataType(keyValueGroup.getFields.apply(0)) + assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) + val valueType = toDataType(keyValueGroup.getFields.apply(1)) + assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED) + new MapType(keyType, valueType) + } else if (correspondsToArray(groupType)) { // ArrayType + val elementType = toDataType(groupType.getFields.apply(0)) + new ArrayType(elementType) + } else { // everything else: StructType + val fields = groupType + .getFields + .map(ptype => new StructField( + ptype.getName, + toDataType(ptype), + ptype.getRepetition != Repetition.REQUIRED)) + new StructType(fields) + } + } + } + } + } + + /** + * For a given Catalyst [[org.apache.spark.sql.catalyst.types.DataType]] return + * the name of the corresponding Parquet primitive type or None if the given type + * is not primitive. + * + * @param ctype The type to convert + * @return The name of the corresponding Parquet primitive type + */ + def fromPrimitiveDataType(ctype: DataType): + Option[ParquetPrimitiveTypeName] = ctype match { + case StringType => Some(ParquetPrimitiveTypeName.BINARY) + case BooleanType => Some(ParquetPrimitiveTypeName.BOOLEAN) + case DoubleType => Some(ParquetPrimitiveTypeName.DOUBLE) + case ArrayType(ByteType) => + Some(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + case FloatType => Some(ParquetPrimitiveTypeName.FLOAT) + case IntegerType => Some(ParquetPrimitiveTypeName.INT32) + // There is no type for Byte or Short so we promote them to INT32. + case ShortType => Some(ParquetPrimitiveTypeName.INT32) + case ByteType => Some(ParquetPrimitiveTypeName.INT32) + case LongType => Some(ParquetPrimitiveTypeName.INT64) + case _ => None + } + + /** + * Converts a given Catalyst [[org.apache.spark.sql.catalyst.types.DataType]] into + * the corresponding Parquet `Type`. + * + * The conversion follows the rules below: + * <ul> + * <li> Primitive types are converted into Parquet's primitive types.</li> + * <li> [[org.apache.spark.sql.catalyst.types.StructType]]s are converted + * into Parquet's `GroupType` with the corresponding field types.</li> + * <li> [[org.apache.spark.sql.catalyst.types.ArrayType]]s are converted + * into a 2-level nested group, where the outer group has the inner + * group as sole field. The inner group has name `values` and + * repetition level `REPEATED` and has the element type of + * the array as schema. We use Parquet's `ConversionPatterns` for this + * purpose.</li> + * <li> [[org.apache.spark.sql.catalyst.types.MapType]]s are converted + * into a nested (2-level) Parquet `GroupType` with two fields: a key + * type and a value type. The nested group has repetition level + * `REPEATED` and name `map`. We use Parquet's `ConversionPatterns` + * for this purpose</li> + * </ul> + * Parquet's repetition level is generally set according to the following rule: + * <ul> + * <li> If the call to `fromDataType` is recursive inside an enclosing `ArrayType` or + * `MapType`, then the repetition level is set to `REPEATED`.</li> + * <li> Otherwise, if the attribute whose type is converted is `nullable`, the Parquet + * type gets repetition level `OPTIONAL` and otherwise `REQUIRED`.</li> + * </ul> + * + *@param ctype The type to convert + * @param name The name of the [[org.apache.spark.sql.catalyst.expressions.Attribute]] + * whose type is converted + * @param nullable When true indicates that the attribute is nullable + * @param inArray When true indicates that this is a nested attribute inside an array. + * @return The corresponding Parquet type. + */ + def fromDataType( + ctype: DataType, + name: String, + nullable: Boolean = true, + inArray: Boolean = false): ParquetType = { + val repetition = + if (inArray) { + Repetition.REPEATED + } else { + if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED + } + val primitiveType = fromPrimitiveDataType(ctype) + if (primitiveType.isDefined) { + new ParquetPrimitiveType(repetition, primitiveType.get, name) + } else { + ctype match { + case ArrayType(elementType) => { + val parquetElementType = fromDataType( + elementType, + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + nullable = false, + inArray = true) + ConversionPatterns.listType(repetition, name, parquetElementType) + } + case StructType(structFields) => { + val fields = structFields.map { + field => fromDataType(field.dataType, field.name, field.nullable, inArray = false) + } + new ParquetGroupType(repetition, name, fields) + } + case MapType(keyType, valueType) => { + val parquetKeyType = + fromDataType( + keyType, + CatalystConverter.MAP_KEY_SCHEMA_NAME, + nullable = false, + inArray = false) + val parquetValueType = + fromDataType( + valueType, + CatalystConverter.MAP_VALUE_SCHEMA_NAME, + nullable = false, + inArray = false) + ConversionPatterns.mapType( + repetition, + name, + parquetKeyType, + parquetValueType) + } + case _ => sys.error(s"Unsupported datatype $ctype") + } + } + } + + def convertToAttributes(parquetSchema: ParquetType): Seq[Attribute] = { + parquetSchema + .asGroupType() + .getFields + .map( + field => + new AttributeReference( + field.getName, + toDataType(field), + field.getRepetition != Repetition.REQUIRED)()) + } + + def convertFromAttributes(attributes: Seq[Attribute]): MessageType = { + val fields = attributes.map( + attribute => + fromDataType(attribute.dataType, attribute.name, attribute.nullable)) + new MessageType("root", fields) + } + + def convertFromString(string: String): Seq[Attribute] = { + DataType(string) match { + case s: StructType => s.toAttributes + case other => sys.error(s"Can convert $string to row") + } + } + + def convertToString(schema: Seq[Attribute]): String = { + StructType.fromAttributes(schema).toString + } + + def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = { + if (origPath == null) { + throw new IllegalArgumentException("Unable to write Parquet metadata: path is null") + } + val fs = origPath.getFileSystem(conf) + if (fs == null) { + throw new IllegalArgumentException( + s"Unable to write Parquet metadata: path $origPath is incorrectly formatted") + } + val path = origPath.makeQualified(fs) + if (fs.exists(path) && !fs.getFileStatus(path).isDir) { + throw new IllegalArgumentException(s"Expected to write to directory $path but found file") + } + val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) + if (fs.exists(metadataPath)) { + try { + fs.delete(metadataPath, true) + } catch { + case e: IOException => + throw new IOException(s"Unable to delete previous PARQUET_METADATA_FILE at $metadataPath") + } + } + val extraMetadata = new java.util.HashMap[String, String]() + extraMetadata.put( + RowReadSupport.SPARK_METADATA_KEY, + ParquetTypesConverter.convertToString(attributes)) + // TODO: add extra data, e.g., table name, date, etc.? + + val parquetSchema: MessageType = + ParquetTypesConverter.convertFromAttributes(attributes) + val metaData: FileMetaData = new FileMetaData( + parquetSchema, + extraMetadata, + "Spark") + + ParquetRelation.enableLogForwarding() + ParquetFileWriter.writeMetadataFile( + conf, + path, + new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil) + } + + /** + * Try to read Parquet metadata at the given Path. We first see if there is a summary file + * in the parent directory. If so, this is used. Else we read the actual footer at the given + * location. + * @param origPath The path at which we expect one (or more) Parquet files. + * @param configuration The Hadoop configuration to use. + * @return The `ParquetMetadata` containing among other things the schema. + */ + def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = { + if (origPath == null) { + throw new IllegalArgumentException("Unable to read Parquet metadata: path is null") + } + val job = new Job() + val conf = configuration.getOrElse(ContextUtil.getConfiguration(job)) + val fs: FileSystem = origPath.getFileSystem(conf) + if (fs == null) { + throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath") + } + val path = origPath.makeQualified(fs) + if (!fs.getFileStatus(path).isDir) { + throw new IllegalArgumentException( + s"Expected $path for be a directory with Parquet files/metadata") + } + ParquetRelation.enableLogForwarding() + val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) + // if this is a new table that was just created we will find only the metadata file + if (fs.exists(metadataPath) && fs.isFile(metadataPath)) { + ParquetFileReader.readFooter(conf, metadataPath) + } else { + // there may be one or more Parquet files in the given directory + val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path)) + // TODO: for now we assume that all footers (if there is more than one) have identical + // metadata; we may want to add a check here at some point + if (footers.size() == 0) { + throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path") + } + footers(0).getParquetMetadata + } + } + + /** + * Reads in Parquet Metadata from the given path and tries to extract the schema + * (Catalyst attributes) from the application-specific key-value map. If this + * is empty it falls back to converting from the Parquet file schema which + * may lead to an upcast of types (e.g., {byte, short} to int). + * + * @param origPath The path at which we expect one (or more) Parquet files. + * @param conf The Hadoop configuration to use. + * @return A list of attributes that make up the schema. + */ + def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): Seq[Attribute] = { + val keyValueMetadata: java.util.Map[String, String] = + readMetaData(origPath, conf) + .getFileMetaData + .getKeyValueMetaData + if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) { + convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY)) + } else { + val attributes = convertToAttributes( + readMetaData(origPath, conf).getFileMetaData.getSchema) + log.warn(s"Falling back to schema conversion from Parquet types; result: $attributes") + attributes + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f479cf37/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 9810520..0c239d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -26,15 +26,16 @@ import parquet.hadoop.ParquetFileWriter import parquet.hadoop.util.ContextUtil import parquet.schema.MessageTypeParser +import org.apache.spark.SparkContext import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.util.getTempFilePath import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType} import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.TestData import org.apache.spark.sql.SchemaRDD -import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.catalyst.expressions.Equals -import org.apache.spark.sql.catalyst.types.IntegerType +import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, Star} import org.apache.spark.util.Utils // Implicits @@ -56,15 +57,37 @@ case class OptionalReflectData( doubleField: Option[Double], booleanField: Option[Boolean]) +case class Nested(i: Int, s: String) + +case class Data(array: Seq[Int], nested: Nested) + +case class AllDataTypes( + stringField: String, + intField: Int, + longField: Long, + floatField: Float, + doubleField: Double, + shortField: Short, + byteField: Byte, + booleanField: Boolean) + class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { import TestData._ TestData // Load test data tables. var testRDD: SchemaRDD = null + // TODO: remove this once SqlParser can parse nested select statements + var nestedParserSqlContext: NestedParserSQLContext = null + override def beforeAll() { + nestedParserSqlContext = new NestedParserSQLContext(TestSQLContext.sparkContext) ParquetTestData.writeFile() ParquetTestData.writeFilterFile() + ParquetTestData.writeNestedFile1() + ParquetTestData.writeNestedFile2() + ParquetTestData.writeNestedFile3() + ParquetTestData.writeNestedFile4() testRDD = parquetFile(ParquetTestData.testDir.toString) testRDD.registerAsTable("testsource") parquetFile(ParquetTestData.testFilterDir.toString) @@ -74,9 +97,33 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA override def afterAll() { Utils.deleteRecursively(ParquetTestData.testDir) Utils.deleteRecursively(ParquetTestData.testFilterDir) + Utils.deleteRecursively(ParquetTestData.testNestedDir1) + Utils.deleteRecursively(ParquetTestData.testNestedDir2) + Utils.deleteRecursively(ParquetTestData.testNestedDir3) + Utils.deleteRecursively(ParquetTestData.testNestedDir4) // here we should also unregister the table?? } + test("Read/Write All Types") { + val tempDir = getTempFilePath("parquetTest").getCanonicalPath + val range = (0 to 255) + TestSQLContext.sparkContext.parallelize(range) + .map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) + .saveAsParquetFile(tempDir) + val result = parquetFile(tempDir).collect() + range.foreach { + i => + assert(result(i).getString(0) == s"$i", s"row $i String field did not match, got ${result(i).getString(0)}") + assert(result(i).getInt(1) === i) + assert(result(i).getLong(2) === i.toLong) + assert(result(i).getFloat(3) === i.toFloat) + assert(result(i).getDouble(4) === i.toDouble) + assert(result(i).getShort(5) === i.toShort) + assert(result(i).getByte(6) === i.toByte) + assert(result(i).getBoolean(7) === (i % 2 == 0)) + } + } + test("self-join parquet files") { val x = ParquetTestData.testData.as('x) val y = ParquetTestData.testData.as('y) @@ -154,7 +201,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA path, TestSQLContext.sparkContext.hadoopConfiguration) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) - val metaData = ParquetTypesConverter.readMetaData(path) + val metaData = ParquetTypesConverter.readMetaData(path, Some(ContextUtil.getConfiguration(job))) assert(metaData != null) ParquetTestData .testData @@ -197,10 +244,37 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i") } Utils.deleteRecursively(file) - assert(true) } - test("insert (appending) to same table via Scala API") { + test("Insert (overwrite) via Scala API") { + val dirname = Utils.createTempDir() + val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + source_rdd.registerAsTable("source") + val dest_rdd = createParquetFile[TestRDDEntry](dirname.toString) + dest_rdd.registerAsTable("dest") + sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect() + val rdd_copy1 = sql("SELECT * FROM dest").collect() + assert(rdd_copy1.size === 100) + assert(rdd_copy1(0).apply(0) === 1) + assert(rdd_copy1(0).apply(1) === "val_1") + // TODO: why does collecting break things? It seems InsertIntoParquet::execute() is + // executed twice otherwise?! + sql("INSERT INTO dest SELECT * FROM source") + val rdd_copy2 = sql("SELECT * FROM dest").collect() + assert(rdd_copy2.size === 200) + assert(rdd_copy2(0).apply(0) === 1) + assert(rdd_copy2(0).apply(1) === "val_1") + assert(rdd_copy2(99).apply(0) === 100) + assert(rdd_copy2(99).apply(1) === "val_100") + assert(rdd_copy2(100).apply(0) === 1) + assert(rdd_copy2(100).apply(1) === "val_1") + Utils.deleteRecursively(dirname) + } + + test("Insert (appending) to same table via Scala API") { + // TODO: why does collecting break things? It seems InsertIntoParquet::execute() is + // executed twice otherwise?! sql("INSERT INTO testsource SELECT * FROM testsource") val double_rdd = sql("SELECT * FROM testsource").collect() assert(double_rdd != null) @@ -363,4 +437,272 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10") assert(query.collect().size === 10) } + + test("Importing nested Parquet file (Addressbook)") { + val result = TestSQLContext + .parquetFile(ParquetTestData.testNestedDir1.toString) + .toSchemaRDD + .collect() + assert(result != null) + assert(result.size === 2) + val first_record = result(0) + val second_record = result(1) + assert(first_record != null) + assert(second_record != null) + assert(first_record.size === 3) + assert(second_record(1) === null) + assert(second_record(2) === null) + assert(second_record(0) === "A. Nonymous") + assert(first_record(0) === "Julien Le Dem") + val first_owner_numbers = first_record(1) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]] + val first_contacts = first_record(2) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]] + assert(first_owner_numbers != null) + assert(first_owner_numbers(0) === "555 123 4567") + assert(first_owner_numbers(2) === "XXX XXX XXXX") + assert(first_contacts(0) + .asInstanceOf[CatalystConverter.StructScalaType[_]].size === 2) + val first_contacts_entry_one = first_contacts(0) + .asInstanceOf[CatalystConverter.StructScalaType[_]] + assert(first_contacts_entry_one(0) === "Dmitriy Ryaboy") + assert(first_contacts_entry_one(1) === "555 987 6543") + val first_contacts_entry_two = first_contacts(1) + .asInstanceOf[CatalystConverter.StructScalaType[_]] + assert(first_contacts_entry_two(0) === "Chris Aniszczyk") + } + + test("Importing nested Parquet file (nested numbers)") { + val result = TestSQLContext + .parquetFile(ParquetTestData.testNestedDir2.toString) + .toSchemaRDD + .collect() + assert(result.size === 1, "number of top-level rows incorrect") + assert(result(0).size === 5, "number of fields in row incorrect") + assert(result(0)(0) === 1) + assert(result(0)(1) === 7) + val subresult1 = result(0)(2).asInstanceOf[CatalystConverter.ArrayScalaType[_]] + assert(subresult1.size === 3) + assert(subresult1(0) === (1.toLong << 32)) + assert(subresult1(1) === (1.toLong << 33)) + assert(subresult1(2) === (1.toLong << 34)) + val subresult2 = result(0)(3) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) + .asInstanceOf[CatalystConverter.StructScalaType[_]] + assert(subresult2.size === 2) + assert(subresult2(0) === 2.5) + assert(subresult2(1) === false) + val subresult3 = result(0)(4) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]] + assert(subresult3.size === 2) + assert(subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 2) + val subresult4 = subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]] + assert(subresult4(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 7) + assert(subresult4(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 8) + assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 1) + assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9) + } + + test("Simple query on addressbook") { + val data = TestSQLContext + .parquetFile(ParquetTestData.testNestedDir1.toString) + .toSchemaRDD + val tmp = data.where('owner === "Julien Le Dem").select('owner as 'a, 'contacts as 'c).collect() + assert(tmp.size === 1) + assert(tmp(0)(0) === "Julien Le Dem") + } + + test("Projection in addressbook") { + val data = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir1.toString) + .toSchemaRDD + data.registerAsTable("data") + val query = nestedParserSqlContext.sql("SELECT owner, contacts[1].name FROM data") + val tmp = query.collect() + assert(tmp.size === 2) + assert(tmp(0).size === 2) + assert(tmp(0)(0) === "Julien Le Dem") + assert(tmp(0)(1) === "Chris Aniszczyk") + assert(tmp(1)(0) === "A. Nonymous") + assert(tmp(1)(1) === null) + } + + test("Simple query on nested int data") { + val data = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir2.toString) + .toSchemaRDD + data.registerAsTable("data") + val result1 = nestedParserSqlContext.sql("SELECT entries[0].value FROM data").collect() + assert(result1.size === 1) + assert(result1(0).size === 1) + assert(result1(0)(0) === 2.5) + val result2 = nestedParserSqlContext.sql("SELECT entries[0] FROM data").collect() + assert(result2.size === 1) + val subresult1 = result2(0)(0).asInstanceOf[CatalystConverter.StructScalaType[_]] + assert(subresult1.size === 2) + assert(subresult1(0) === 2.5) + assert(subresult1(1) === false) + val result3 = nestedParserSqlContext.sql("SELECT outerouter FROM data").collect() + val subresult2 = result3(0)(0) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]] + assert(subresult2(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 7) + assert(subresult2(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 8) + assert(result3(0)(0) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](1) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9) + } + + test("nested structs") { + val data = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir3.toString) + .toSchemaRDD + data.registerAsTable("data") + val result1 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[0].value[0].truth FROM data").collect() + assert(result1.size === 1) + assert(result1(0).size === 1) + assert(result1(0)(0) === false) + val result2 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[0].value[1].truth FROM data").collect() + assert(result2.size === 1) + assert(result2(0).size === 1) + assert(result2(0)(0) === true) + val result3 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[1].value[0].truth FROM data").collect() + assert(result3.size === 1) + assert(result3(0).size === 1) + assert(result3(0)(0) === false) + } + + test("simple map") { + val data = TestSQLContext + .parquetFile(ParquetTestData.testNestedDir4.toString) + .toSchemaRDD + data.registerAsTable("mapTable") + val result1 = sql("SELECT data1 FROM mapTable").collect() + assert(result1.size === 1) + assert(result1(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, _]] + .getOrElse("key1", 0) === 1) + assert(result1(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, _]] + .getOrElse("key2", 0) === 2) + val result2 = sql("""SELECT data1["key1"] FROM mapTable""").collect() + assert(result2(0)(0) === 1) + } + + test("map with struct values") { + val data = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir4.toString) + .toSchemaRDD + data.registerAsTable("mapTable") + val result1 = nestedParserSqlContext.sql("SELECT data2 FROM mapTable").collect() + assert(result1.size === 1) + val entry1 = result1(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]] + .getOrElse("seven", null) + assert(entry1 != null) + assert(entry1(0) === 42) + assert(entry1(1) === "the answer") + val entry2 = result1(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]] + .getOrElse("eight", null) + assert(entry2 != null) + assert(entry2(0) === 49) + assert(entry2(1) === null) + val result2 = nestedParserSqlContext.sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM mapTable""").collect() + assert(result2.size === 1) + assert(result2(0)(0) === 42.toLong) + assert(result2(0)(1) === "the answer") + } + + test("Writing out Addressbook and reading it back in") { + // TODO: find out why CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME + // has no effect in this test case + val tmpdir = Utils.createTempDir() + Utils.deleteRecursively(tmpdir) + val result = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir1.toString) + .toSchemaRDD + result.saveAsParquetFile(tmpdir.toString) + nestedParserSqlContext + .parquetFile(tmpdir.toString) + .toSchemaRDD + .registerAsTable("tmpcopy") + val tmpdata = nestedParserSqlContext.sql("SELECT owner, contacts[1].name FROM tmpcopy").collect() + assert(tmpdata.size === 2) + assert(tmpdata(0).size === 2) + assert(tmpdata(0)(0) === "Julien Le Dem") + assert(tmpdata(0)(1) === "Chris Aniszczyk") + assert(tmpdata(1)(0) === "A. Nonymous") + assert(tmpdata(1)(1) === null) + Utils.deleteRecursively(tmpdir) + } + + test("Writing out Map and reading it back in") { + val data = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir4.toString) + .toSchemaRDD + val tmpdir = Utils.createTempDir() + Utils.deleteRecursively(tmpdir) + data.saveAsParquetFile(tmpdir.toString) + nestedParserSqlContext + .parquetFile(tmpdir.toString) + .toSchemaRDD + .registerAsTable("tmpmapcopy") + val result1 = nestedParserSqlContext.sql("""SELECT data1["key2"] FROM tmpmapcopy""").collect() + assert(result1.size === 1) + assert(result1(0)(0) === 2) + val result2 = nestedParserSqlContext.sql("SELECT data2 FROM tmpmapcopy").collect() + assert(result2.size === 1) + val entry1 = result2(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]] + .getOrElse("seven", null) + assert(entry1 != null) + assert(entry1(0) === 42) + assert(entry1(1) === "the answer") + val entry2 = result2(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]] + .getOrElse("eight", null) + assert(entry2 != null) + assert(entry2(0) === 49) + assert(entry2(1) === null) + val result3 = nestedParserSqlContext.sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM tmpmapcopy""").collect() + assert(result3.size === 1) + assert(result3(0)(0) === 42.toLong) + assert(result3(0)(1) === "the answer") + Utils.deleteRecursively(tmpdir) + } +} + +// TODO: the code below is needed temporarily until the standard parser is able to parse +// nested field expressions correctly +class NestedParserSQLContext(@transient override val sparkContext: SparkContext) extends SQLContext(sparkContext) { + override protected[sql] val parser = new NestedSqlParser() +} + +class NestedSqlLexical(override val keywords: Seq[String]) extends SqlLexical(keywords) { + override def identChar = letter | elem('_') + delimiters += (".") +} + +class NestedSqlParser extends SqlParser { + override val lexical = new NestedSqlLexical(reservedWords) + + override protected lazy val baseExpression: PackratParser[Expression] = + expression ~ "[" ~ expression <~ "]" ^^ { + case base ~ _ ~ ordinal => GetItem(base, ordinal) + } | + expression ~ "." ~ ident ^^ { + case base ~ _ ~ fieldName => GetField(base, fieldName) + } | + TRUE ^^^ Literal(true, BooleanType) | + FALSE ^^^ Literal(false, BooleanType) | + cast | + "(" ~> expression <~ ")" | + function | + "-" ~> literal ^^ UnaryMinus | + ident ^^ UnresolvedAttribute | + "*" ^^^ Star(None) | + literal } http://git-wip-us.apache.org/repos/asf/spark/blob/f479cf37/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6828434..f923d68 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -208,7 +208,9 @@ object HiveMetastoreTypes extends RegexParsers { } protected lazy val structType: Parser[DataType] = - "struct" ~> "<" ~> repsep(structField,",") <~ ">" ^^ StructType + "struct" ~> "<" ~> repsep(structField,",") <~ ">" ^^ { + case fields => new StructType(fields) + } protected lazy val dataType: Parser[DataType] = arrayType |
