Repository: spark
Updated Branches:
  refs/heads/branch-2.3 bfe50b684 -> 61e7bc0c1


[SPARK-24169][SQL] JsonToStructs should not access SQLConf at executor side

## What changes were proposed in this pull request?

This PR is extracted from #21190 , to make it easier to backport.

`JsonToStructs` can be serialized to executors and evaluate, we should not call 
`SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body.

## How was this patch tested?

tested in #21190

Author: Wenchen Fan <[email protected]>

Closes #21226 from cloud-fan/minor4.

(cherry picked from commit 96a50016bb0fb1cc57823a6706bff2467d671efd)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61e7bc0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61e7bc0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61e7bc0c

Branch: refs/heads/branch-2.3
Commit: 61e7bc0c145b0da3129e1dac46d72cf0db5e1d94
Parents: bfe50b6
Author: Wenchen Fan <[email protected]>
Authored: Thu May 3 23:36:09 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu May 3 23:36:59 2018 +0800

----------------------------------------------------------------------
 .../catalyst/analysis/FunctionRegistry.scala    |  4 +-
 .../catalyst/expressions/jsonExpressions.scala  | 16 +++--
 .../expressions/JsonExpressionsSuite.scala      | 76 ++++++++++----------
 .../scala/org/apache/spark/sql/functions.scala  |  2 +-
 .../sql-tests/results/json-functions.sql.out    |  4 +-
 5 files changed, 54 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/61e7bc0c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 747016b..f3dfd69 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -526,7 +526,9 @@ object FunctionRegistry {
         // Otherwise, find a constructor method that matches the number of 
arguments, and use that.
         val params = Seq.fill(expressions.size)(classOf[Expression])
         val f = constructors.find(_.getParameterTypes.toSeq == 
params).getOrElse {
-          val validParametersCount = 
constructors.map(_.getParameterCount).distinct.sorted
+          val validParametersCount = constructors
+            .filter(_.getParameterTypes.forall(_ == classOf[Expression]))
+            .map(_.getParameterCount).distinct.sorted
           val expectedNumberOfParameters = if (validParametersCount.length == 
1) {
             validParametersCount.head.toString
           } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/61e7bc0c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index fdd672c..34161f0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -514,11 +514,10 @@ case class JsonToStructs(
     schema: DataType,
     options: Map[String, String],
     child: Expression,
-    timeZoneId: Option[String] = None)
+    timeZoneId: Option[String],
+    forceNullableSchema: Boolean)
   extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback 
with ExpectsInputTypes {
 
-  val forceNullableSchema = 
SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)
-
   // The JSON input data might be missing certain fields. We force the 
nullability
   // of the user-provided schema to avoid data corruptions. In particular, the 
parquet-mr encoder
   // can generate incorrect files if values are missing in columns declared as 
non-nullable.
@@ -532,14 +531,21 @@ case class JsonToStructs(
       schema = JsonExprUtils.validateSchemaLiteral(schema),
       options = Map.empty[String, String],
       child = child,
-      timeZoneId = None)
+      timeZoneId = None,
+      forceNullableSchema = 
SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
 
   def this(child: Expression, schema: Expression, options: Expression) =
     this(
       schema = JsonExprUtils.validateSchemaLiteral(schema),
       options = JsonExprUtils.convertToMapData(options),
       child = child,
-      timeZoneId = None)
+      timeZoneId = None,
+      forceNullableSchema = 
SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
+
+  // Used in `org.apache.spark.sql.functions`
+  def this(schema: DataType, options: Map[String, String], child: Expression) =
+    this(schema, options, child, timeZoneId = None,
+      forceNullableSchema = 
SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
 
   override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
     case _: StructType | ArrayType(_: StructType, _) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/61e7bc0c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 7812319..00e9763 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -392,7 +392,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
     val jsonData = """{"a": 1}"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
+      JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
       InternalRow(1)
     )
   }
@@ -401,13 +401,13 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
     val jsonData = """{"a" 1}"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
+      JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
       null
     )
 
     // Other modes should still return `null`.
     checkEvaluation(
-      JsonToStructs(schema, Map("mode" -> PermissiveMode.name), 
Literal(jsonData), gmtId),
+      JsonToStructs(schema, Map("mode" -> PermissiveMode.name), 
Literal(jsonData), gmtId, true),
       null
     )
   }
@@ -416,62 +416,62 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
     val input = """[{"a": 1}, {"a": 2}]"""
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = InternalRow(1) :: InternalRow(2) :: Nil
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, 
true), output)
   }
 
   test("from_json - input=object, schema=array, output=array of single row") {
     val input = """{"a": 1}"""
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = InternalRow(1) :: Nil
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, 
true), output)
   }
 
   test("from_json - input=empty array, schema=array, output=empty array") {
     val input = "[ ]"
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = Nil
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, 
true), output)
   }
 
   test("from_json - input=empty object, schema=array, output=array of single 
row with null") {
     val input = "{ }"
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = InternalRow(null) :: Nil
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, 
true), output)
   }
 
   test("from_json - input=array of single object, schema=struct, output=single 
row") {
     val input = """[{"a": 1}]"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = InternalRow(1)
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, 
true), output)
   }
 
   test("from_json - input=array, schema=struct, output=null") {
     val input = """[{"a": 1}, {"a": 2}]"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = null
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, 
true), output)
   }
 
   test("from_json - input=empty array, schema=struct, output=null") {
     val input = """[]"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = null
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, 
true), output)
   }
 
   test("from_json - input=empty object, schema=struct, output=single row with 
null") {
     val input = """{  }"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = InternalRow(null)
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, 
true), output)
   }
 
   test("from_json null input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal.create(null, StringType), 
gmtId),
+      JsonToStructs(schema, Map.empty, Literal.create(null, StringType), 
gmtId, true),
       null
     )
   }
@@ -479,7 +479,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
   test("SPARK-20549: from_json bad UTF-8") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal(badJson), gmtId),
+      JsonToStructs(schema, Map.empty, Literal(badJson), gmtId, true),
       null)
   }
 
@@ -491,14 +491,14 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
     c.set(2016, 0, 1, 0, 0, 0)
     c.set(Calendar.MILLISECOND, 123)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId),
+      JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId, true),
       InternalRow(c.getTimeInMillis * 1000L)
     )
     // The result doesn't change because the json string includes timezone 
string ("Z" here),
     // which means the string represents the timestamp string in the timezone 
regardless of
     // the timeZoneId parameter.
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")),
+      JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST"), 
true),
       InternalRow(c.getTimeInMillis * 1000L)
     )
 
@@ -512,7 +512,8 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
           schema,
           Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
           Literal(jsonData2),
-          Option(tz.getID)),
+          Option(tz.getID),
+          true),
         InternalRow(c.getTimeInMillis * 1000L)
       )
       checkEvaluation(
@@ -521,7 +522,8 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
           Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
             DateTimeUtils.TIMEZONE_OPTION -> tz.getID),
           Literal(jsonData2),
-          gmtId),
+          gmtId,
+          true),
         InternalRow(c.getTimeInMillis * 1000L)
       )
     }
@@ -530,7 +532,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
   test("SPARK-19543: from_json empty input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
+      JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId, 
true),
       null
     )
   }
@@ -685,27 +687,23 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
 
   test("from_json missing fields") {
     for (forceJsonNullableSchema <- Seq(false, true)) {
-      withSQLConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA.key -> 
forceJsonNullableSchema.toString) {
-        val input =
-          """{
-            |  "a": 1,
-            |  "c": "foo"
-            |}
-            |""".stripMargin
-        val jsonSchema = new StructType()
-          .add("a", LongType, nullable = false)
-          .add("b", StringType, nullable = false)
-          .add("c", StringType, nullable = false)
-        val output = InternalRow(1L, null, UTF8String.fromString("foo"))
-        checkEvaluation(
-          JsonToStructs(jsonSchema, Map.empty, Literal.create(input, 
StringType), gmtId),
-          output
-        )
-        val schema = JsonToStructs(jsonSchema, Map.empty, 
Literal.create(input, StringType), gmtId)
-          .dataType
-        val schemaToCompare = if (forceJsonNullableSchema) 
jsonSchema.asNullable else jsonSchema
-        assert(schemaToCompare == schema)
-      }
+      val input =
+        """{
+          |  "a": 1,
+          |  "c": "foo"
+          |}
+          |""".stripMargin
+      val jsonSchema = new StructType()
+        .add("a", LongType, nullable = false)
+        .add("b", StringType, nullable = false)
+        .add("c", StringType, nullable = false)
+      val output = InternalRow(1L, null, UTF8String.fromString("foo"))
+      val expr = JsonToStructs(
+        jsonSchema, Map.empty, Literal.create(input, StringType), gmtId, 
forceJsonNullableSchema)
+      checkEvaluation(expr, output)
+      val schema = expr.dataType
+      val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable 
else jsonSchema
+      assert(schemaToCompare == schema)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/61e7bc0c/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index c9ca9a8..e31a9c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3132,7 +3132,7 @@ object functions {
    * @since 2.2.0
    */
   def from_json(e: Column, schema: DataType, options: Map[String, String]): 
Column = withExpr {
-    JsonToStructs(schema, options, e.expr)
+    new JsonToStructs(schema, options, e.expr)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/61e7bc0c/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index 581dddc..14a6912 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -129,7 +129,7 @@ select to_json()
 struct<>
 -- !query 12 output
 org.apache.spark.sql.AnalysisException
-Invalid number of arguments for function to_json. Expected: one of 1, 2 and 3; 
Found: 0; line 1 pos 7
+Invalid number of arguments for function to_json. Expected: one of 1 and 2; 
Found: 0; line 1 pos 7
 
 
 -- !query 13
@@ -225,7 +225,7 @@ select from_json()
 struct<>
 -- !query 21 output
 org.apache.spark.sql.AnalysisException
-Invalid number of arguments for function from_json. Expected: one of 2, 3 and 
4; Found: 0; line 1 pos 7
+Invalid number of arguments for function from_json. Expected: one of 2 and 3; 
Found: 0; line 1 pos 7
 
 
 -- !query 22


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to