[SPARK-11692][SQL] Support for Parquet logical types, JSON and  BSON (embedded 
types)

Parquet supports some JSON and BSON datatypes. They are represented as binary 
for BSON and string (UTF-8) for JSON internally.

I searched a bit and found Apache drill also supports both in this way, 
[link](https://drill.apache.org/docs/parquet-format/).

Author: hyukjinkwon <[email protected]>
Author: Hyukjin Kwon <[email protected]>

Closes #9658 from HyukjinKwon/SPARK-11692.

(cherry picked from commit e388b39d10fc269cdd3d630ea7d4ae80fd0efa97)
Signed-off-by: Michael Armbrust <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e822425a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e822425a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e822425a

Branch: refs/heads/branch-1.6
Commit: e822425ae718dc1b8e51dd69eff38314ecb0c629
Parents: e8390e1
Author: hyukjinkwon <[email protected]>
Authored: Mon Nov 16 21:59:33 2015 +0800
Committer: Michael Armbrust <[email protected]>
Committed: Wed Nov 18 15:03:37 2015 -0800

----------------------------------------------------------------------
 .../parquet/CatalystSchemaConverter.scala       |  3 ++-
 .../datasources/parquet/ParquetIOSuite.scala    | 25 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e822425a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index f28a18e..5f9f908 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -170,9 +170,10 @@ private[parquet] class CatalystSchemaConverter(
 
       case BINARY =>
         originalType match {
-          case UTF8 | ENUM => StringType
+          case UTF8 | ENUM | JSON => StringType
           case null if assumeBinaryIsString => StringType
           case null => BinaryType
+          case BSON => BinaryType
           case DECIMAL => makeDecimalType()
           case _ => illegalType()
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/e822425a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 29a5282..1f2831b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -251,6 +251,31 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
     }
   }
 
+  test("SPARK-11692 Support for Parquet logical types, JSON and BSON (embedded 
types)") {
+    val parquetSchema = MessageTypeParser.parseMessageType(
+      """message root {
+        |  required binary a(JSON);
+        |  required binary b(BSON);
+        |}
+      """.stripMargin)
+
+    withTempPath { location =>
+      val extraMetadata = Map.empty[String, String].asJava
+      val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, 
"Spark")
+      val path = new Path(location.getCanonicalPath)
+      val footer = List(
+        new Footer(path, new ParquetMetadata(fileMetadata, 
Collections.emptyList()))
+      ).asJava
+
+      ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, 
path, footer)
+
+      val jsonDataType = 
sqlContext.read.parquet(path.toString).schema(0).dataType
+      assert(jsonDataType === StringType)
+      val bsonDataType = 
sqlContext.read.parquet(path.toString).schema(1).dataType
+      assert(bsonDataType === BinaryType)
+    }
+  }
+
   test("compression codec") {
     def compressionCodecFor(path: String, codecName: String): String = {
       val codecs = for {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to