This is an automated email from the ASF dual-hosted git repository.

wangzhen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0436c459eb [KYUUBI #7180][LINEAGE] Subquery in the project should 
always drill down to get the lineage relationships
0436c459eb is described below

commit 0436c459eb5a20062ad12b0b0a41da34517bde8c
Author: chenliang.lu <[email protected]>
AuthorDate: Tue Jan 13 09:29:21 2026 +0800

    [KYUUBI #7180][LINEAGE] Subquery in the project should always drill down to 
get the lineage relationships
    
    ### Why are the changes needed?
    
    The following SQL statement will get the wrong column lineage result:
    create table table0(a int, b string, c string)
    create table table1(a int, b string, c string)
    select (select sum(a) from table0 where table1.b = table0.b) as aa, b from 
table1
    
    The root cause:
    From  https://github.com/apache/spark/pull/32687 ,  we can know the 
references for a subquery expression are defined as outer attribute references. 
 So we should always drill down to get the corresponding column lineage 
relationship for the subquery plan.
    
    ### How was this patch tested?
    
    add new ut
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #7181 from yabola/lineage-subquery.
    
    Closes #7180
    
    866046aa2 [chenliang.lu] improve code
    e9ded00b3 [chenliang.lu] improve the subquery extraction operator
    5ea674475 [chenliang.lu] [KYUUBI #7180][LINEAGE] Subquery in the project 
should drill down to get the column lineage relationships
    
    Authored-by: chenliang.lu <[email protected]>
    Signed-off-by: wforget <[email protected]>
---
 .../helper/SparkSQLLineageParseHelper.scala        | 10 +++++---
 .../helper/SparkSQLLineageParserHelperSuite.scala  | 29 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 4 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index 7f7248669f..27d8ae2467 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -128,15 +128,17 @@ trait LineageParser {
       inputTablesByPlan: mutable.HashSet[String]): AttributeMap[AttributeSet] 
= {
     val exps = named.map {
       case exp: Alias =>
+        val subqueryPlans = getExpressionSubqueryPlans(exp.child)
         val references =
-          if (exp.references.nonEmpty) exp.references
-          else {
-            val attrRefs = getExpressionSubqueryPlans(exp.child)
+          if (subqueryPlans.nonEmpty) {
+            val attrRefs = subqueryPlans
               .map(extractColumnsLineage(_, ListMap[Attribute, 
AttributeSet](), inputTablesByPlan))
               .foldLeft(ListMap[Attribute, 
AttributeSet]())(mergeColumnsLineage).values
               .foldLeft(AttributeSet.empty)(_ ++ _)
               .map(attr => attr.withQualifier(attr.qualifier :+ 
SUBQUERY_COLUMN_IDENTIFIER))
-            AttributeSet(attrRefs)
+            AttributeSet(attrRefs) ++ exp.references
+          } else {
+            exp.references
           }
         (
           exp.toAttribute,
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index 90da4650b1..86be8bb0d8 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -1139,6 +1139,35 @@ abstract class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
         List(
           ("a", Set(s"$DEFAULT_CATALOG.default.table1.a")),
           ("b", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
+
+      val sql12 =
+        """
+          |select (select sum(a) from table0 where table1.b = table0.b) as aa, 
b from table1
+          |""".stripMargin
+      val ret12 = extractLineage(sql12)
+      assert(ret12 == Lineage(
+        List(s"$DEFAULT_CATALOG.default.table0", 
s"$DEFAULT_CATALOG.default.table1"),
+        List(),
+        List(
+          ("aa", Set(s"$DEFAULT_CATALOG.default.table0.a", 
s"$DEFAULT_CATALOG.default.table1.b")),
+          ("b", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
+
+      val sql13 =
+        """
+          |select if((select sum(a) from table0 where table1.b = table0.b) > 
100, b, c) as aa,
+          | b from table1 """.stripMargin
+      val ret13 = extractLineage(sql13)
+      assert(ret13 == Lineage(
+        List(s"$DEFAULT_CATALOG.default.table0", 
s"$DEFAULT_CATALOG.default.table1"),
+        List(),
+        List(
+          (
+            "aa",
+            Set(
+              s"$DEFAULT_CATALOG.default.table0.a",
+              s"$DEFAULT_CATALOG.default.table1.b",
+              s"$DEFAULT_CATALOG.default.table1.c")),
+          ("b", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
     }
   }
 

Reply via email to