Repository: spark
Updated Branches:
  refs/heads/branch-1.3 df83e2197 -> b75943f66


[SPARK-6315] [SQL] Also tries the case class string parser while reading 
Parquet schema

When writing Parquet files, Spark 1.1.x persists the schema string into Parquet 
metadata with the result of `StructType.toString`, which was then deprecated in 
Spark 1.2 by a schema string in JSON format. But we still need to take the old 
schema format into account while reading Parquet files.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/5034)
<!-- Reviewable:end -->

Author: Cheng Lian <l...@databricks.com>

Closes #5034 from liancheng/spark-6315 and squashes the following commits:

a182f58 [Cheng Lian] Adds a regression test
b9c6dbe [Cheng Lian] Also tries the case class string parser while reading 
Parquet schema

(cherry picked from commit 937c1e5503963e67a5412be993d30dbec6fc9883)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b75943f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b75943f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b75943f6

Branch: refs/heads/branch-1.3
Commit: b75943f661b74c59c19847430663450bf57cfd8e
Parents: df83e21
Author: Cheng Lian <l...@databricks.com>
Authored: Sat Mar 21 11:18:45 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Sat Mar 21 11:19:41 2015 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 23 ++++++++++-
 .../spark/sql/parquet/ParquetIOSuite.scala      | 42 ++++++++++++++++++--
 2 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b75943f6/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e82f7ad..3ee4015 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -662,7 +662,7 @@ private[sql] case class ParquetRelation2(
   }
 }
 
-private[sql] object ParquetRelation2 {
+private[sql] object ParquetRelation2 extends Logging {
   // Whether we should merge schemas collected from all Parquet part-files.
   val MERGE_SCHEMA = "mergeSchema"
 
@@ -682,7 +682,26 @@ private[sql] object ParquetRelation2 {
         .getKeyValueMetaData
         .toMap
         .get(RowReadSupport.SPARK_METADATA_KEY)
-        .map(DataType.fromJson(_).asInstanceOf[StructType])
+        .flatMap { serializedSchema =>
+          // Don't throw even if we failed to parse the serialized Spark 
schema. Just fallback to
+          // whatever is available.
+          Try(DataType.fromJson(serializedSchema))
+            .recover { case _: Throwable =>
+              logInfo(
+                s"Serialized Spark schema in Parquet key-value metadata is not 
in JSON format, " +
+                  "falling back to the deprecated DataType.fromCaseClassString 
parser.")
+              DataType.fromCaseClassString(serializedSchema)
+            }
+            .recover { case cause: Throwable =>
+              logWarning(
+                s"""Failed to parse serialized Spark schema in Parquet 
key-value metadata:
+                   |\t$serializedSchema
+                 """.stripMargin,
+                cause)
+            }
+            .map(_.asInstanceOf[StructType])
+            .toOption
+        }
 
       maybeSparkSchema.getOrElse {
         // Falls back to Parquet schema if Spark SQL schema is absent.

http://git-wip-us.apache.org/repos/asf/spark/blob/b75943f6/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index a70b3c7..5438095 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -28,8 +28,8 @@ import parquet.example.data.simple.SimpleGroup
 import parquet.example.data.{Group, GroupWriter}
 import parquet.hadoop.api.WriteSupport
 import parquet.hadoop.api.WriteSupport.WriteContext
-import parquet.hadoop.metadata.CompressionCodecName
-import parquet.hadoop.{ParquetFileWriter, ParquetWriter}
+import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, 
CompressionCodecName}
+import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
 import parquet.io.api.RecordConsumer
 import parquet.schema.{MessageType, MessageTypeParser}
 
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext._
 import org.apache.spark.sql.test.TestSQLContext.implicits._
-import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
 
 // Write support class for nested groups: ParquetWriter initializes 
GroupWriteSupport
@@ -330,6 +330,42 @@ class ParquetIOSuiteBase extends QueryTest with 
ParquetTest {
     }
   }
 
+  test("SPARK-6315 regression test") {
+    // Spark 1.1 and prior versions write Spark schema as case class string 
into Parquet metadata.
+    // This has been deprecated by JSON format since 1.2.  Notice that, 1.3 
further refactored data
+    // types API, and made StructType.fields an array.  This makes the result 
of StructType.toString
+    // different from prior versions: there's no "Seq" wrapping the fields 
part in the string now.
+    val sparkSchema =
+      
"StructType(Seq(StructField(a,BooleanType,false),StructField(b,IntegerType,false)))"
+
+    // The Parquet schema is intentionally made different from the Spark 
schema.  Because the new
+    // Parquet data source simply falls back to the Parquet schema once it 
fails to parse the Spark
+    // schema.  By making these two different, we are able to assert the old 
style case class string
+    // is parsed successfully.
+    val parquetSchema = MessageTypeParser.parseMessageType(
+      """message root {
+        |  required int32 c;
+        |}
+      """.stripMargin)
+
+    withTempPath { location =>
+      val extraMetadata = Map(RowReadSupport.SPARK_METADATA_KEY -> 
sparkSchema.toString)
+      val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, 
"Spark")
+      val path = new Path(location.getCanonicalPath)
+
+      ParquetFileWriter.writeMetadataFile(
+        sparkContext.hadoopConfiguration,
+        path,
+        new Footer(path, new ParquetMetadata(fileMetadata, Nil)) :: Nil)
+
+      assertResult(parquetFile(path.toString).schema) {
+        StructType(
+          StructField("a", BooleanType, nullable = false) ::
+          StructField("b", IntegerType, nullable = false) ::
+          Nil)
+      }
+    }
+  }
 }
 
 class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with 
BeforeAndAfterAll {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to