This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 75860a3cc50f [SPARK-49743][SQL] OptimizeCsvJsonExpr should not change
schema fields when pruning GetArrayStructFields
75860a3cc50f is described below
commit 75860a3cc50f366403d396a27eaa726f6860519a
Author: Nikhil Sheoran <[email protected]>
AuthorDate: Tue Oct 1 09:49:00 2024 +0800
[SPARK-49743][SQL] OptimizeCsvJsonExpr should not change schema fields when
pruning GetArrayStructFields
### What changes were proposed in this pull request?
- Cherry-pick of the original PR -
https://github.com/apache/spark/pull/48190 to Spark 3.5
- When pruning the schema of the struct in `GetArrayStructFields`, rely on
the existing `StructType` to obtain the pruned schema instead of using the
accessed field.
### Why are the changes needed?
- Fixes a bug in `OptimizeCsvJsonExprs` rule that would have otherwise
changed the schema fields of the underlying struct to be extracted.
- This would show up as a correctness issue where for a field instead of
picking the right values we would have ended up giving null output.
### Does this PR introduce _any_ user-facing change?
Yes. The query output would change for the queries of the following type:
```
SELECT
from_json('[{"a": '||id||', "b": '|| (2*id) ||'}]', 'array<struct<a: INT,
b: INT>>').a,
from_json('[{"a": '||id||', "b": '|| (2*id) ||'}]', 'array<struct<a: INT,
b: INT>>').A
FROM
range(3) as t
```
Earlier, the result would had been:
```
Array([ArraySeq(0),ArraySeq(null)], [ArraySeq(1),ArraySeq(null)],
[ArraySeq(2),ArraySeq(null)])
```
vs the new result is (verified through spark-shell):
```
Array([ArraySeq(0),ArraySeq(0)], [ArraySeq(1),ArraySeq(1)],
[ArraySeq(2),ArraySeq(2)])
```
### How was this patch tested?
- Added unit tests.
- Without this change, the added test would fail as we would have modified
the schema from `a` to `A`:
```
- SPARK-49743: prune unnecessary columns from GetArrayStructFields does not
change schema *** FAILED ***
== FAIL: Plans do not match ===
!Project
[from_json(ArrayType(StructType(StructField(A,IntegerType,true)),true), json#0,
Some(America/Los_Angeles)).A AS a#0] Project [from_json(ArrayType(StructType(S
tructField(a,IntegerType,true)),true), json#0, Some(America/Los_Angeles)).A
AS a#0]
+- LocalRelation <empty>, [json#0]
+- LocalRelation <empty>,
[json#0] (PlanT
est.scala:179)
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48308 from nikhilsheoran-db/SPARK-49743-3.5.
Authored-by: Nikhil Sheoran
<[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala | 7 ++++---
.../sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala | 17 +++++++++++++++++
.../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 13 +++++++++++++
3 files changed, 34 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala
index 4347137bf68b..04cc230f99b4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala
@@ -112,9 +112,10 @@ object OptimizeCsvJsonExprs extends Rule[LogicalPlan] {
val prunedSchema = StructType(Array(schema(ordinal)))
g.copy(child = j.copy(schema = prunedSchema), ordinal = 0)
- case g @ GetArrayStructFields(j @ JsonToStructs(schema: ArrayType, _, _,
_), _, _, _, _)
- if schema.elementType.asInstanceOf[StructType].length > 1 &&
j.options.isEmpty =>
- val prunedSchema = ArrayType(StructType(Array(g.field)), g.containsNull)
+ case g @ GetArrayStructFields(j @ JsonToStructs(ArrayType(schema:
StructType, _),
+ _, _, _), _, ordinal, _, _) if schema.length > 1 && j.options.isEmpty
=>
+ // Obtain the pruned schema by picking the `ordinal` field of the struct.
+ val prunedSchema = ArrayType(StructType(Array(schema(ordinal))),
g.containsNull)
g.copy(child = j.copy(schema = prunedSchema), ordinal = 0, numFields = 1)
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala
index c185de4c05d8..eed06da609f8 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala
@@ -307,4 +307,21 @@ class OptimizeJsonExprsSuite extends PlanTest with
ExpressionEvalHelper {
comparePlans(optimized, query.analyze)
}
}
+
+ test("SPARK-49743: prune unnecessary columns from GetArrayStructFields does
not change schema") {
+ val options = Map.empty[String, String]
+ val schema = ArrayType(StructType.fromDDL("a int, b int"), containsNull =
true)
+
+ val field = StructField("A", IntegerType) // Instead of "a", use "A" to
test case sensitivity.
+ val query = testRelation2
+ .select(GetArrayStructFields(
+ JsonToStructs(schema, options, $"json"), field, 0, 2, true).as("a"))
+ val optimized = Optimizer.execute(query.analyze)
+
+ val prunedSchema = ArrayType(StructType.fromDDL("a int"), containsNull =
true)
+ val expected = testRelation2
+ .select(GetArrayStructFields(
+ JsonToStructs(prunedSchema, options, $"json"), field, 0, 1,
true).as("a")).analyze
+ comparePlans(optimized, expected)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 71f4f17de61d..793a0da6a862 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -4711,6 +4711,19 @@ class SQLQuerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
val df6 = df3.join(df2, col("df3.zaak_id") === col("df2.customer_id"),
"outer")
df5.crossJoin(df6)
}
+
+ test("SPARK-49743: OptimizeCsvJsonExpr does not change schema when pruning
struct") {
+ val df = sql("""
+ | SELECT
+ | from_json('[{"a": '||id||', "b": '|| (2*id) ||'}]',
'array<struct<a: INT, b: INT>>').a,
+ | from_json('[{"a": '||id||', "b": '|| (2*id) ||'}]',
'array<struct<a: INT, b: INT>>').A
+ | FROM
+ | range(3) as t
+ |""".stripMargin)
+ val expectedAnswer = Seq(
+ Row(Array(0), Array(0)), Row(Array(1), Array(1)), Row(Array(2),
Array(2)))
+ checkAnswer(df, expectedAnswer)
+ }
}
case class Foo(bar: Option[String])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]