This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8163fa4f581d [SPARK-53124][SQL] Prune unnecessary fields from JsonTuple
8163fa4f581d is described below

commit 8163fa4f581dd03d2cfc385f8b8709e1e5efd668
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Wed Aug 13 11:51:10 2025 +0800

    [SPARK-53124][SQL] Prune unnecessary fields from JsonTuple
    
    ### What changes were proposed in this pull request?
    
    This PR enhances the `GenerateOptimization` rule in Spark SQL Catalyst to 
improve the pruning of unnecessary fields from `JsonTuple` generators.
    
    Explicitly handled these cases:
    - No generator outputs used: remove the Generate node.
    - Some generator outputs used: prune the JsonTuple to keep only necessary 
fields.
    
    Example:
    ```sql
    SELECT f2
    FROM (SELECT '{"f1": "Spark", "f2": 2025, "f3": 8}' AS json) test
    LATERAL VIEW json_tuple(json, 'f1', 'f2', 'f2') jt AS f1, f2, f3
    ```
    
    Before this PR:
    ```
    == Optimized Logical Plan ==
    Project [f2#2]
    +- Generate json_tuple({"f1": "Spark", "f2": 2025, "f3": 8}, f1, f2, f2), 
false, jt, [f1#1, f2#2, f3#3]
       +- OneRowRelation
    ```
    
    After this PR:
    ```
    == Optimized Logical Plan ==
    Generate json_tuple({"f1": "Spark", "f2": 2025, "f3": 8}, f2), false, jt, 
[f2#2]
    +- OneRowRelation
    ```
    
    ### Why are the changes needed?
    
    Prune unnecessary JSON fields, reducing data processing overhead and 
improving query performance.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added unit tests covering all scenarios: no outputs used, some outputs 
used, and all outputs used.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51843 from wangyum/SPARK-53124.
    
    Authored-by: Yuming Wang <yumw...@ebay.com>
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 22 ++++++++
 .../optimizer/GenerateOptimizationSuite.scala      | 58 ++++++++++++++++++++--
 2 files changed, 77 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 60f7145ed6e3..4b149ebfb1f3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -2585,6 +2585,28 @@ object GenerateOptimization extends Rule[LogicalPlan] {
             p.withNewChildren(Seq(updatedGenerate))
           case _ => p
         }
+
+      case p @ Project(_, g: Generate) if g.generator.isInstanceOf[JsonTuple] 
=>
+        val generatorOutput = g.generatorOutput
+        val usedOutputs =
+          
AttributeSet(generatorOutput).intersect(AttributeSet(p.projectList.flatMap(_.references)))
+
+        usedOutputs.size match {
+          case 0 =>
+            p.withNewChildren(g.children)
+          case n if n < generatorOutput.size =>
+            val originJsonTuple = g.generator.asInstanceOf[JsonTuple]
+            val (newJsonExpressions, newGeneratorOutput) =
+              generatorOutput.zipWithIndex.collect {
+                case (attr, i) if usedOutputs.contains(attr) =>
+                  (originJsonTuple.children(i + 1), attr)
+              }.unzip
+            p.withNewChildren(Seq(g.copy(
+              generator = JsonTuple(originJsonTuple.children.head +: 
newJsonExpressions),
+              generatorOutput = newGeneratorOutput)))
+          case _ =>
+            p
+        }
     }
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/GenerateOptimizationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/GenerateOptimizationSuite.scala
index 7d67284e46f0..60675a45f08d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/GenerateOptimizationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/GenerateOptimizationSuite.scala
@@ -19,12 +19,12 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.Explode
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Explode, 
JsonTuple, Literal}
 import 
org.apache.spark.sql.catalyst.optimizer.NestedColumnAliasingSuite.collectGeneratedAliases
 import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Generate, LocalRelation, 
LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructType}
 
 class GenerateOptimizationSuite extends PlanTest {
 
@@ -78,4 +78,56 @@ class GenerateOptimizationSuite extends PlanTest {
 
     comparePlans(optimized, expected)
   }
+
+  test("SPARK-53124: prune unnecessary fields from JsonTuple") {
+    val jsonStrAttr = AttributeReference("json_str", StringType)()
+    val aAttr = AttributeReference("a", StringType)()
+    val bAttr = AttributeReference("b", StringType)()
+    val cAttr = AttributeReference("c", StringType)()
+    val jsonRelation = LocalRelation(jsonStrAttr)
+
+    // Create JsonTuple generator with 3 fields
+    val jsonTuple = JsonTuple(Seq(jsonStrAttr, Literal("a"), Literal("b"), 
Literal("c")))
+    val generate = Generate(
+      generator = jsonTuple,
+      unrequiredChildIndex = Nil,
+      outer = false,
+      qualifier = None,
+      generatorOutput = Seq(aAttr, bAttr, cAttr),
+      child = jsonRelation)
+
+    // Case 1: No generator outputs used - should eliminate Generate completely
+    val projectNone = Project(Seq(jsonStrAttr), generate).analyze
+    val optimizedNone = Optimize.execute(projectNone)
+    val expectedNone = jsonRelation.select(jsonStrAttr).analyze
+    comparePlans(optimizedNone, expectedNone)
+
+    // Case 2: Some generator outputs used (just 'b')
+    val projectSome = Project(Seq(bAttr), generate).analyze
+    val optimizedSome = Optimize.execute(projectSome)
+    val expectedSome = Project(
+      Seq(bAttr),
+      Generate(
+        generator = JsonTuple(Seq(jsonStrAttr, Literal("b"))),
+        unrequiredChildIndex = Seq(0),
+        outer = false,
+        qualifier = None,
+        generatorOutput = Seq(bAttr),
+        child = jsonRelation)).analyze
+    comparePlans(optimizedSome, expectedSome)
+
+    // Case 3: All generator outputs used - plan should remain unchanged
+    val projectAll = Project(Seq(aAttr, bAttr, cAttr), generate)
+    val optimizedAll = Optimize.execute(projectAll)
+    val expectedAll = Project(
+      Seq(aAttr, bAttr, cAttr),
+      Generate(
+        generator = jsonTuple,
+        unrequiredChildIndex = Seq(0),
+        outer = false,
+        qualifier = None,
+        generatorOutput = Seq(aAttr, bAttr, cAttr),
+        child = jsonRelation)).analyze
+    comparePlans(optimizedAll, expectedAll)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to