This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new 86bb35d335 [KYUUBI #6912][LINEAGE] Properly handle empty attribute set
on mergeRelationColumnLineage
86bb35d335 is described below
commit 86bb35d33582728add6686e6fbdc9e35b1d0a80c
Author: xglv1985 <[email protected]>
AuthorDate: Fri Feb 14 10:27:51 2025 +0800
[KYUUBI #6912][LINEAGE] Properly handle empty attribute set on
mergeRelationColumnLineage
# Why are the changes needed?
## Issue reference:
https://github.com/apache/kyuubi/issues/6912
## How to reproduce the issue?
The changes in this PR will avoid a wrong result when generating the
instance of org.apache.kyuubi.plugin.lineage.Lineage, in the certain case as
follows:
step 1: create a temporary view from a file
step 2: insert into a table by selecting from the temporary view in step 1
step 3: generate the lineage when executing the insert statement in step 2
In detail, please see the UT code submission in this patch.
## The issue analysis
Let's see the current code when getting the Lineage object by resolving a
LogicalPlan object:
<img width="694" alt="image"
src="https://github.com/user-attachments/assets/65256a0d-320d-4271-968f-59eafb74de9f"
/>
According to the above logic, a None
org.apache.kyuubi.plugin.lineage.Lineage object will be generated due to
"try-catch" self-protection, in this certain case. This None object will lead
to problems in the following 2 scenes:
### Unit Test Environment
In Unit Test, when the code runs here a "None.get" exception will be raised:
<img width="682" alt="image"
src="https://github.com/user-attachments/assets/102dc9bd-294f-4b1e-b1c6-01b6fee50fed"
/>
Here's the runtime exception stack:
```
None.get
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at
org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParserHelperSuite.extractLineageWithoutExecuting(SparkSQLLineageParserHelperSuite.scala:1485)
at
org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParserHelperSuite.$anonfun$new$83(SparkSQLLineageParserHelperSuite.scala:1465)
```
### Production Environment
This Lineage object cannot be used in the production environment because it
has a None value which lacks some necessary lineage information. The right
content of the Lineage instance in the above case should be:
```
inputTables(List())
outputTables(List(spark_catalog.test_db.test_table_from_dir))
columnLineage(List(ColumnLineage(spark_catalog.test_db.test_table_from_dir.a0,Set()),
ColumnLineage(spark_catalog.test_db.test_table_from_dir.b0,Set())))
```
a newly added test case(test directory to table) passed after this issue is
fixed.
# How to fix the issue?
Add a "Empty judgment" logic. In detail, please see the code submission in
this patch.
# How was this patch tested?
1. by adding a new test case in UT code and make sure it passes
2. by submitting a Spark application including the SQL of this case in the
production environment, and make sure a right Lineage instance is generated,
instead of a None object
# Was this patch authored or co-authored using generative AI tooling?
No
Closes #6911 from xglv1985/fix_spark_lineage_runtime_exception.
Closes #6912
13a71075d [Cheng Pan] Update
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
4e89b95cd [Cheng Pan] Update
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
59b350bfb [xglv1985] fix a runtime exception when generate column lineage
tuple--more readable code
52bc0288d [xglv1985] fix a runtime exception when generate column lineage
tuple--spotless sytle
fea6bbc0d [xglv1985] fix a runtime exception when generate column lineage
tuple--remove tab from UT code
901879095 [xglv1985] fix a runtime exception when generate column lineage
tuple--unit test
fbb4df879 [xglv1985] fix a runtime exception when generate column lineage
tuple
Lead-authored-by: xglv1985 <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 7c110b68f84af490b42d1519716661583dc7edd2)
Signed-off-by: Cheng Pan <[email protected]>
---
.../helper/SparkSQLLineageParseHelper.scala | 2 ++
.../helper/SparkSQLLineageParserHelperSuite.scala | 36 +++++++++++++++++++++-
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index 2731114645..effe2ae831 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -176,6 +176,8 @@ trait LineageParser {
relationColumnLineage: AttributeMap[AttributeSet]):
AttributeMap[AttributeSet] = {
val mergedRelationColumnLineage = {
relationOutput.foldLeft((ListMap[Attribute, AttributeSet](),
relationColumnLineage)) {
+ case ((acc, x), attr) if x.isEmpty =>
+ (acc + (attr -> AttributeSet.empty), x.empty)
case ((acc, x), attr) =>
(acc + (attr -> x.head._2), x.tail)
}._1
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index 7612743798..9d8aee23ff 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -59,10 +59,16 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
" partitioned by(pid)")
spark.sql("create table if not exists test_db0.test_table1" +
" (key int, value string) using parquet")
+ spark.sql("create table test_db.test_table_from_dir" +
+ " (`a0` string, `b0` string) using parquet")
}
override def afterAll(): Unit = {
- Seq("test_db0.test_table0", "test_db0.test_table1",
"test_db0.test_table_part0").foreach { t =>
+ Seq(
+ "test_db0.test_table0",
+ "test_db0.test_table1",
+ "test_db0.test_table_part0",
+ "test_db.test_table_from_dir").foreach { t =>
spark.sql(s"drop table if exists $t")
}
spark.sql("drop database if exists test_db")
@@ -1442,6 +1448,34 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
}
}
+ test("test directory to table") {
+ val inputFile = getClass.getResource("/").getPath + "input_file"
+ val sourceFile = File(inputFile).createFile()
+ spark.sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW temp_view (
+ | `a` STRING COMMENT '',
+ | `b` STRING COMMENT ''
+ |) USING csv OPTIONS(
+ | sep='\t',
+ | path='${sourceFile.path}'
+ |);
+ |""".stripMargin).collect()
+
+ val ret0 = extractLineageWithoutExecuting(
+ s"""
+ |INSERT OVERWRITE TABLE test_db.test_table_from_dir
+ |SELECT `a`, `b` FROM temp_view
+ |""".stripMargin)
+
+ assert(ret0 == Lineage(
+ List(),
+ List(s"spark_catalog.test_db.test_table_from_dir"),
+ List(
+ (s"spark_catalog.test_db.test_table_from_dir.a0", Set()),
+ (s"spark_catalog.test_db.test_table_from_dir.b0", Set()))))
+ }
+
private def extractLineageWithoutExecuting(sql: String): Lineage = {
val parsed = spark.sessionState.sqlParser.parsePlan(sql)
val analyzed = spark.sessionState.analyzer.execute(parsed)