This is an automated email from the ASF dual-hosted git repository.
wangzhen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 0436c459eb [KYUUBI #7180][LINEAGE] Subquery in the project should
always drill down to get the lineage relationships
0436c459eb is described below
commit 0436c459eb5a20062ad12b0b0a41da34517bde8c
Author: chenliang.lu <[email protected]>
AuthorDate: Tue Jan 13 09:29:21 2026 +0800
[KYUUBI #7180][LINEAGE] Subquery in the project should always drill down to
get the lineage relationships
### Why are the changes needed?
The following SQL statement will get the wrong column lineage result:
create table table0(a int, b string, c string)
create table table1(a int, b string, c string)
select (select sum(a) from table0 where table1.b = table0.b) as aa, b from
table1
The root cause:
From https://github.com/apache/spark/pull/32687 , we can know the
references for a subquery expression are defined as outer attribute references.
So we should always drill down to get the corresponding column lineage
relationship for the subquery plan.
### How was this patch tested?
add new ut
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #7181 from yabola/lineage-subquery.
Closes #7180
866046aa2 [chenliang.lu] improve code
e9ded00b3 [chenliang.lu] improve the subquery extraction operator
5ea674475 [chenliang.lu] [KYUUBI #7180][LINEAGE] Subquery in the project
should drill down to get the column lineage relationships
Authored-by: chenliang.lu <[email protected]>
Signed-off-by: wforget <[email protected]>
---
.../helper/SparkSQLLineageParseHelper.scala | 10 +++++---
.../helper/SparkSQLLineageParserHelperSuite.scala | 29 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 4 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index 7f7248669f..27d8ae2467 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -128,15 +128,17 @@ trait LineageParser {
inputTablesByPlan: mutable.HashSet[String]): AttributeMap[AttributeSet]
= {
val exps = named.map {
case exp: Alias =>
+ val subqueryPlans = getExpressionSubqueryPlans(exp.child)
val references =
- if (exp.references.nonEmpty) exp.references
- else {
- val attrRefs = getExpressionSubqueryPlans(exp.child)
+ if (subqueryPlans.nonEmpty) {
+ val attrRefs = subqueryPlans
.map(extractColumnsLineage(_, ListMap[Attribute,
AttributeSet](), inputTablesByPlan))
.foldLeft(ListMap[Attribute,
AttributeSet]())(mergeColumnsLineage).values
.foldLeft(AttributeSet.empty)(_ ++ _)
.map(attr => attr.withQualifier(attr.qualifier :+
SUBQUERY_COLUMN_IDENTIFIER))
- AttributeSet(attrRefs)
+ AttributeSet(attrRefs) ++ exp.references
+ } else {
+ exp.references
}
(
exp.toAttribute,
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index 90da4650b1..86be8bb0d8 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -1139,6 +1139,35 @@ abstract class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
List(
("a", Set(s"$DEFAULT_CATALOG.default.table1.a")),
("b", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
+
+ val sql12 =
+ """
+ |select (select sum(a) from table0 where table1.b = table0.b) as aa,
b from table1
+ |""".stripMargin
+ val ret12 = extractLineage(sql12)
+ assert(ret12 == Lineage(
+ List(s"$DEFAULT_CATALOG.default.table0",
s"$DEFAULT_CATALOG.default.table1"),
+ List(),
+ List(
+ ("aa", Set(s"$DEFAULT_CATALOG.default.table0.a",
s"$DEFAULT_CATALOG.default.table1.b")),
+ ("b", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
+
+ val sql13 =
+ """
+ |select if((select sum(a) from table0 where table1.b = table0.b) >
100, b, c) as aa,
+ | b from table1 """.stripMargin
+ val ret13 = extractLineage(sql13)
+ assert(ret13 == Lineage(
+ List(s"$DEFAULT_CATALOG.default.table0",
s"$DEFAULT_CATALOG.default.table1"),
+ List(),
+ List(
+ (
+ "aa",
+ Set(
+ s"$DEFAULT_CATALOG.default.table0.a",
+ s"$DEFAULT_CATALOG.default.table1.b",
+ s"$DEFAULT_CATALOG.default.table1.c")),
+ ("b", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
}
}