This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 75860a3cc50f [SPARK-49743][SQL] OptimizeCsvJsonExpr should not change 
schema fields when pruning GetArrayStructFields
75860a3cc50f is described below

commit 75860a3cc50f366403d396a27eaa726f6860519a
Author: Nikhil Sheoran <[email protected]>
AuthorDate: Tue Oct 1 09:49:00 2024 +0800

    [SPARK-49743][SQL] OptimizeCsvJsonExpr should not change schema fields when 
pruning GetArrayStructFields
    
    ### What changes were proposed in this pull request?
    - Cherry-pick of the original PR - 
https://github.com/apache/spark/pull/48190 to Spark 3.5
    
    - When pruning the schema of the struct in `GetArrayStructFields`, rely on 
the existing `StructType` to obtain the pruned schema instead of using the 
accessed field.
    
    ### Why are the changes needed?
    
    - Fixes a bug in `OptimizeCsvJsonExprs` rule that would have otherwise 
changed the schema fields of the underlying struct to be extracted.
    - This would show up as a correctness issue where for a field instead of 
picking the right values we would have ended up giving null output.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. The query output would change for the queries of the following type:
    ```
    SELECT
      from_json('[{"a": '||id||', "b": '|| (2*id) ||'}]', 'array<struct<a: INT, 
b: INT>>').a,
      from_json('[{"a": '||id||', "b": '|| (2*id) ||'}]', 'array<struct<a: INT, 
b: INT>>').A
    FROM
      range(3) as t
    ```
    
    Earlier, the result would had been:
    ```
    Array([ArraySeq(0),ArraySeq(null)], [ArraySeq(1),ArraySeq(null)], 
[ArraySeq(2),ArraySeq(null)])
    ```
    vs the new result is (verified through spark-shell):
    ```
    Array([ArraySeq(0),ArraySeq(0)], [ArraySeq(1),ArraySeq(1)], 
[ArraySeq(2),ArraySeq(2)])
    ```
    
    ### How was this patch tested?
    - Added unit tests.
    - Without this change, the added test would fail as we would have modified 
the schema from `a` to `A`:
    ```
    - SPARK-49743: prune unnecessary columns from GetArrayStructFields does not 
change schema *** FAILED ***
      == FAIL: Plans do not match ===
      !Project 
[from_json(ArrayType(StructType(StructField(A,IntegerType,true)),true), json#0, 
Some(America/Los_Angeles)).A AS a#0]   Project [from_json(ArrayType(StructType(S
    tructField(a,IntegerType,true)),true), json#0, Some(America/Los_Angeles)).A 
AS a#0]
       +- LocalRelation <empty>, [json#0]                                       
                                                      +- LocalRelation <empty>, 
[json#0] (PlanT
    est.scala:179)
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48308 from nikhilsheoran-db/SPARK-49743-3.5.
    
    Authored-by: Nikhil Sheoran 
<[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala   |  7 ++++---
 .../sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala | 17 +++++++++++++++++
 .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 13 +++++++++++++
 3 files changed, 34 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala
index 4347137bf68b..04cc230f99b4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala
@@ -112,9 +112,10 @@ object OptimizeCsvJsonExprs extends Rule[LogicalPlan] {
       val prunedSchema = StructType(Array(schema(ordinal)))
       g.copy(child = j.copy(schema = prunedSchema), ordinal = 0)
 
-    case g @ GetArrayStructFields(j @ JsonToStructs(schema: ArrayType, _, _, 
_), _, _, _, _)
-        if schema.elementType.asInstanceOf[StructType].length > 1 && 
j.options.isEmpty =>
-      val prunedSchema = ArrayType(StructType(Array(g.field)), g.containsNull)
+    case g @ GetArrayStructFields(j @ JsonToStructs(ArrayType(schema: 
StructType, _),
+        _, _, _), _, ordinal, _, _) if schema.length > 1 && j.options.isEmpty 
=>
+      // Obtain the pruned schema by picking the `ordinal` field of the struct.
+      val prunedSchema = ArrayType(StructType(Array(schema(ordinal))), 
g.containsNull)
       g.copy(child = j.copy(schema = prunedSchema), ordinal = 0, numFields = 1)
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala
index c185de4c05d8..eed06da609f8 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala
@@ -307,4 +307,21 @@ class OptimizeJsonExprsSuite extends PlanTest with 
ExpressionEvalHelper {
       comparePlans(optimized, query.analyze)
     }
   }
+
+  test("SPARK-49743: prune unnecessary columns from GetArrayStructFields does 
not change schema") {
+    val options = Map.empty[String, String]
+    val schema = ArrayType(StructType.fromDDL("a int, b int"), containsNull = 
true)
+
+    val field = StructField("A", IntegerType) // Instead of "a", use "A" to 
test case sensitivity.
+    val query = testRelation2
+      .select(GetArrayStructFields(
+        JsonToStructs(schema, options, $"json"), field, 0, 2, true).as("a"))
+    val optimized = Optimizer.execute(query.analyze)
+
+    val prunedSchema = ArrayType(StructType.fromDDL("a int"), containsNull = 
true)
+    val expected = testRelation2
+      .select(GetArrayStructFields(
+        JsonToStructs(prunedSchema, options, $"json"), field, 0, 1, 
true).as("a")).analyze
+    comparePlans(optimized, expected)
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 71f4f17de61d..793a0da6a862 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -4711,6 +4711,19 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     val df6 = df3.join(df2, col("df3.zaak_id") === col("df2.customer_id"), 
"outer")
     df5.crossJoin(df6)
   }
+
+  test("SPARK-49743: OptimizeCsvJsonExpr does not change schema when pruning 
struct") {
+    val df = sql("""
+        | SELECT
+        |    from_json('[{"a": '||id||', "b": '|| (2*id) ||'}]', 
'array<struct<a: INT, b: INT>>').a,
+        |    from_json('[{"a": '||id||', "b": '|| (2*id) ||'}]', 
'array<struct<a: INT, b: INT>>').A
+        | FROM
+        |    range(3) as t
+        |""".stripMargin)
+    val expectedAnswer = Seq(
+      Row(Array(0), Array(0)), Row(Array(1), Array(1)), Row(Array(2), 
Array(2)))
+    checkAnswer(df, expectedAnswer)
+  }
 }
 
 case class Foo(bar: Option[String])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to