This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8163fa4f581d [SPARK-53124][SQL] Prune unnecessary fields from JsonTuple 8163fa4f581d is described below commit 8163fa4f581dd03d2cfc385f8b8709e1e5efd668 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Wed Aug 13 11:51:10 2025 +0800 [SPARK-53124][SQL] Prune unnecessary fields from JsonTuple ### What changes were proposed in this pull request? This PR enhances the `GenerateOptimization` rule in Spark SQL Catalyst to improve the pruning of unnecessary fields from `JsonTuple` generators. Explicitly handled these cases: - No generator outputs used: remove the Generate node. - Some generator outputs used: prune the JsonTuple to keep only necessary fields. Example: ```sql SELECT f2 FROM (SELECT '{"f1": "Spark", "f2": 2025, "f3": 8}' AS json) test LATERAL VIEW json_tuple(json, 'f1', 'f2', 'f2') jt AS f1, f2, f3 ``` Before this PR: ``` == Optimized Logical Plan == Project [f2#2] +- Generate json_tuple({"f1": "Spark", "f2": 2025, "f3": 8}, f1, f2, f2), false, jt, [f1#1, f2#2, f3#3] +- OneRowRelation ``` After this PR: ``` == Optimized Logical Plan == Generate json_tuple({"f1": "Spark", "f2": 2025, "f3": 8}, f2), false, jt, [f2#2] +- OneRowRelation ``` ### Why are the changes needed? Prune unnecessary JSON fields, reducing data processing overhead and improving query performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests covering all scenarios: no outputs used, some outputs used, and all outputs used. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51843 from wangyum/SPARK-53124. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 22 ++++++++ .../optimizer/GenerateOptimizationSuite.scala | 58 ++++++++++++++++++++-- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 60f7145ed6e3..4b149ebfb1f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -2585,6 +2585,28 @@ object GenerateOptimization extends Rule[LogicalPlan] { p.withNewChildren(Seq(updatedGenerate)) case _ => p } + + case p @ Project(_, g: Generate) if g.generator.isInstanceOf[JsonTuple] => + val generatorOutput = g.generatorOutput + val usedOutputs = + AttributeSet(generatorOutput).intersect(AttributeSet(p.projectList.flatMap(_.references))) + + usedOutputs.size match { + case 0 => + p.withNewChildren(g.children) + case n if n < generatorOutput.size => + val originJsonTuple = g.generator.asInstanceOf[JsonTuple] + val (newJsonExpressions, newGeneratorOutput) = + generatorOutput.zipWithIndex.collect { + case (attr, i) if usedOutputs.contains(attr) => + (originJsonTuple.children(i + 1), attr) + }.unzip + p.withNewChildren(Seq(g.copy( + generator = JsonTuple(originJsonTuple.children.head +: newJsonExpressions), + generatorOutput = newGeneratorOutput))) + case _ => + p + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/GenerateOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/GenerateOptimizationSuite.scala index 7d67284e46f0..60675a45f08d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/GenerateOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/GenerateOptimizationSuite.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.Explode +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Explode, JsonTuple, Literal} import org.apache.spark.sql.catalyst.optimizer.NestedColumnAliasingSuite.collectGeneratedAliases import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Generate, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} class GenerateOptimizationSuite extends PlanTest { @@ -78,4 +78,56 @@ class GenerateOptimizationSuite extends PlanTest { comparePlans(optimized, expected) } + + test("SPARK-53124: prune unnecessary fields from JsonTuple") { + val jsonStrAttr = AttributeReference("json_str", StringType)() + val aAttr = AttributeReference("a", StringType)() + val bAttr = AttributeReference("b", StringType)() + val cAttr = AttributeReference("c", StringType)() + val jsonRelation = LocalRelation(jsonStrAttr) + + // Create JsonTuple generator with 3 fields + val jsonTuple = JsonTuple(Seq(jsonStrAttr, Literal("a"), Literal("b"), Literal("c"))) + val generate = Generate( + generator = jsonTuple, + unrequiredChildIndex = Nil, + outer = false, + qualifier = None, + generatorOutput = Seq(aAttr, bAttr, cAttr), + child = jsonRelation) + + // Case 1: No generator outputs used - should eliminate Generate completely + val projectNone = Project(Seq(jsonStrAttr), generate).analyze + val optimizedNone = Optimize.execute(projectNone) + val expectedNone = jsonRelation.select(jsonStrAttr).analyze + comparePlans(optimizedNone, expectedNone) + + // Case 2: Some generator outputs used (just 'b') + val projectSome = Project(Seq(bAttr), generate).analyze + val optimizedSome = Optimize.execute(projectSome) + val expectedSome = Project( + Seq(bAttr), + Generate( + generator = JsonTuple(Seq(jsonStrAttr, Literal("b"))), + unrequiredChildIndex = Seq(0), + outer = false, + qualifier = None, + generatorOutput = Seq(bAttr), + child = jsonRelation)).analyze + comparePlans(optimizedSome, expectedSome) + + // Case 3: All generator outputs used - plan should remain unchanged + val projectAll = Project(Seq(aAttr, bAttr, cAttr), generate) + val optimizedAll = Optimize.execute(projectAll) + val expectedAll = Project( + Seq(aAttr, bAttr, cAttr), + Generate( + generator = jsonTuple, + unrequiredChildIndex = Seq(0), + outer = false, + qualifier = None, + generatorOutput = Seq(aAttr, bAttr, cAttr), + child = jsonRelation)).analyze + comparePlans(optimizedAll, expectedAll) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org