Repository: spark
Updated Branches:
  refs/heads/branch-2.3 3ec25d5a8 -> b083bd107


[SPARK-23173][SQL] Avoid creating corrupt parquet files when loading data from 
JSON

## What changes were proposed in this pull request?

The from_json() function accepts an additional parameter, where the user might 
specify the schema. The issue is that the specified schema might not be 
compatible with data. In particular, the JSON data might be missing data for 
fields declared as non-nullable in the schema. The from_json() function does 
not verify the data against such errors. When data with missing fields is sent 
to the parquet encoder, there is no verification either. The end results is a 
corrupt parquet file.

To avoid corruptions, make sure that all fields in the user-specified schema 
are set to be nullable.
Since this changes the behavior of a public function, we need to include it in 
release notes.
The behavior can be reverted by setting 
`spark.sql.fromJsonForceNullableSchema=false`

## How was this patch tested?

Added two new tests.

Author: Michał Świtakowski <michal.switakow...@databricks.com>

Closes #20694 from mswit-databricks/SPARK-23173.

(cherry picked from commit 2ca9bb083c515511d2bfee271fc3e0269aceb9d5)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b083bd10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b083bd10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b083bd10

Branch: refs/heads/branch-2.3
Commit: b083bd107d25bd3f7a4cdcf3aafa07b9895878b6
Parents: 3ec25d5
Author: Michał Świtakowski <michal.switakow...@databricks.com>
Authored: Fri Mar 9 14:29:31 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Fri Mar 9 14:29:49 2018 -0800

----------------------------------------------------------------------
 .../catalyst/expressions/jsonExpressions.scala  | 22 ++++++++------
 .../org/apache/spark/sql/internal/SQLConf.scala |  8 ++++++
 .../expressions/JsonExpressionsSuite.scala      | 30 +++++++++++++++++++-
 .../datasources/parquet/ParquetIOSuite.scala    | 19 +++++++++++++
 4 files changed, 70 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b083bd10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 18b4fed..fdd672c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -30,6 +30,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.json._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
BadRecordException, FailFastMode, GenericArrayData, MapData}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -515,10 +516,15 @@ case class JsonToStructs(
     child: Expression,
     timeZoneId: Option[String] = None)
   extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback 
with ExpectsInputTypes {
-  override def nullable: Boolean = true
 
-  def this(schema: DataType, options: Map[String, String], child: Expression) =
-    this(schema, options, child, None)
+  val forceNullableSchema = 
SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)
+
+  // The JSON input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions. In particular, the 
parquet-mr encoder
+  // can generate incorrect files if values are missing in columns declared as 
non-nullable.
+  val nullableSchema = if (forceNullableSchema) schema.asNullable else schema
+
+  override def nullable: Boolean = true
 
   // Used in `FunctionRegistry`
   def this(child: Expression, schema: Expression) =
@@ -535,22 +541,22 @@ case class JsonToStructs(
       child = child,
       timeZoneId = None)
 
-  override def checkInputDataTypes(): TypeCheckResult = schema match {
+  override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
     case _: StructType | ArrayType(_: StructType, _) =>
       super.checkInputDataTypes()
     case _ => TypeCheckResult.TypeCheckFailure(
-      s"Input schema ${schema.simpleString} must be a struct or an array of 
structs.")
+      s"Input schema ${nullableSchema.simpleString} must be a struct or an 
array of structs.")
   }
 
   @transient
-  lazy val rowSchema = schema match {
+  lazy val rowSchema = nullableSchema match {
     case st: StructType => st
     case ArrayType(st: StructType, _) => st
   }
 
   // This converts parsed rows to the desired output by the given schema.
   @transient
-  lazy val converter = schema match {
+  lazy val converter = nullableSchema match {
     case _: StructType =>
       (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
     case ArrayType(_: StructType, _) =>
@@ -563,7 +569,7 @@ case class JsonToStructs(
       rowSchema,
       new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
 
-  override def dataType: DataType = schema
+  override def dataType: DataType = nullableSchema
 
   override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
     copy(timeZoneId = Option(timeZoneId))

http://git-wip-us.apache.org/repos/asf/spark/blob/b083bd10/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 47fcf34..eebb4c7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -481,6 +481,14 @@ object SQLConf {
     .stringConf
     .createWithDefault("_corrupt_record")
 
+  val FROM_JSON_FORCE_NULLABLE_SCHEMA = 
buildConf("spark.sql.fromJsonForceNullableSchema")
+    .internal()
+    .doc("When true, force the output schema of the from_json() function to be 
nullable " +
+      "(including all the fields). Otherwise, the schema might not be 
compatible with" +
+      "actual data, which leads to curruptions.")
+    .booleanConf
+    .createWithDefault(true)
+
   val BROADCAST_TIMEOUT = buildConf("spark.sql.broadcastTimeout")
     .doc("Timeout in seconds for the broadcast wait time in broadcast joins.")
     .timeConf(TimeUnit.SECONDS)

http://git-wip-us.apache.org/repos/asf/spark/blob/b083bd10/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index a0bbe02..7812319 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -22,11 +22,13 @@ import java.util.Calendar
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.plans.PlanTestBase
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
-class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
+class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper 
with PlanTestBase {
   val json =
     """
       
|{"store":{"fruit":[{"weight":8,"type":"apple"},{"weight":9,"type":"pear"}],
@@ -680,4 +682,30 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
       )
     }
   }
+
+  test("from_json missing fields") {
+    for (forceJsonNullableSchema <- Seq(false, true)) {
+      withSQLConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA.key -> 
forceJsonNullableSchema.toString) {
+        val input =
+          """{
+            |  "a": 1,
+            |  "c": "foo"
+            |}
+            |""".stripMargin
+        val jsonSchema = new StructType()
+          .add("a", LongType, nullable = false)
+          .add("b", StringType, nullable = false)
+          .add("c", StringType, nullable = false)
+        val output = InternalRow(1L, null, UTF8String.fromString("foo"))
+        checkEvaluation(
+          JsonToStructs(jsonSchema, Map.empty, Literal.create(input, 
StringType), gmtId),
+          output
+        )
+        val schema = JsonToStructs(jsonSchema, Map.empty, 
Literal.create(input, StringType), gmtId)
+          .dataType
+        val schemaToCompare = if (forceJsonNullableSchema) 
jsonSchema.asNullable else jsonSchema
+        assert(schemaToCompare == schema)
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b083bd10/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index f3ece5b..e4e0e6e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, 
ScalaReflection}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import 
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -771,6 +772,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       assert(option.compressionCodecClassName == "UNCOMPRESSED")
     }
   }
+
+  test("SPARK-23173 Writing a file with data converted from JSON with and 
incorrect user schema") {
+    withTempPath { file =>
+      val jsonData =
+        """{
+        |  "a": 1,
+        |  "c": "foo"
+        |}
+        |""".stripMargin
+      val jsonSchema = new StructType()
+        .add("a", LongType, nullable = false)
+        .add("b", StringType, nullable = false)
+        .add("c", StringType, nullable = false)
+      spark.range(1).select(from_json(lit(jsonData), jsonSchema) as "input")
+        .write.parquet(file.getAbsolutePath)
+      checkAnswer(spark.read.parquet(file.getAbsolutePath), Seq(Row(Row(1, 
null, "foo"))))
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to