This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new 86bb35d335 [KYUUBI #6912][LINEAGE] Properly handle empty attribute set 
on mergeRelationColumnLineage
86bb35d335 is described below

commit 86bb35d33582728add6686e6fbdc9e35b1d0a80c
Author: xglv1985 <[email protected]>
AuthorDate: Fri Feb 14 10:27:51 2025 +0800

    [KYUUBI #6912][LINEAGE] Properly handle empty attribute set on 
mergeRelationColumnLineage
    
    # Why are the changes needed?
    ## Issue reference:
    https://github.com/apache/kyuubi/issues/6912
    
    ## How to reproduce the issue?
    The changes in this PR will avoid a wrong result when generating the 
instance of org.apache.kyuubi.plugin.lineage.Lineage, in the certain case as 
follows:
    step 1: create a temporary view from a file
    step 2: insert into a table by selecting from the temporary view in step 1
    step 3: generate the lineage when executing the insert statement in step 2
    In detail, please see the UT code submission in this patch.
    
    ## The issue analysis
    Let's see the current code when getting the Lineage object by resolving a 
LogicalPlan object:
    <img width="694" alt="image" 
src="https://github.com/user-attachments/assets/65256a0d-320d-4271-968f-59eafb74de9f";
 />
    
    According to the above logic, a None 
org.apache.kyuubi.plugin.lineage.Lineage object will be generated due to 
"try-catch" self-protection, in this certain case. This None object will lead 
to problems in the following 2 scenes:
    ### Unit Test Environment
    In Unit Test, when the code runs here a "None.get" exception will be raised:
    <img width="682" alt="image" 
src="https://github.com/user-attachments/assets/102dc9bd-294f-4b1e-b1c6-01b6fee50fed";
 />
    
    Here's the runtime exception stack:
    ```
    None.get
    java.util.NoSuchElementException: None.get
            at scala.None$.get(Option.scala:529)
            at scala.None$.get(Option.scala:527)
            at 
org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParserHelperSuite.extractLineageWithoutExecuting(SparkSQLLineageParserHelperSuite.scala:1485)
            at 
org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParserHelperSuite.$anonfun$new$83(SparkSQLLineageParserHelperSuite.scala:1465)
    ```
    ### Production Environment
    This Lineage object cannot be used in the production environment because it 
has a None value which lacks some necessary lineage information. The right 
content of the Lineage instance in the above case should be:
    ```
    inputTables(List())
    outputTables(List(spark_catalog.test_db.test_table_from_dir))
    
columnLineage(List(ColumnLineage(spark_catalog.test_db.test_table_from_dir.a0,Set()),
 ColumnLineage(spark_catalog.test_db.test_table_from_dir.b0,Set())))
    ```
    
    a newly added test case(test directory to table) passed after this issue is 
fixed.
    
    # How to fix the issue?
    Add a "Empty judgment" logic. In detail, please see the code submission in 
this patch.
    
    # How was this patch tested?
    1. by adding a new test case in UT code and make sure it passes
    2. by submitting a Spark application including the SQL of this case in the 
production environment, and make sure a right Lineage instance is generated, 
instead of a None object
    
    # Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #6911 from xglv1985/fix_spark_lineage_runtime_exception.
    
    Closes #6912
    
    13a71075d [Cheng Pan] Update 
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
    4e89b95cd [Cheng Pan] Update 
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
    59b350bfb [xglv1985] fix a runtime exception when generate column lineage 
tuple--more readable code
    52bc0288d [xglv1985] fix a runtime exception when generate column lineage 
tuple--spotless sytle
    fea6bbc0d [xglv1985] fix a runtime exception when generate column lineage 
tuple--remove tab from UT code
    901879095 [xglv1985] fix a runtime exception when generate column lineage 
tuple--unit test
    fbb4df879 [xglv1985] fix a runtime exception when generate column lineage 
tuple
    
    Lead-authored-by: xglv1985 <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 7c110b68f84af490b42d1519716661583dc7edd2)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../helper/SparkSQLLineageParseHelper.scala        |  2 ++
 .../helper/SparkSQLLineageParserHelperSuite.scala  | 36 +++++++++++++++++++++-
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index 2731114645..effe2ae831 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -176,6 +176,8 @@ trait LineageParser {
       relationColumnLineage: AttributeMap[AttributeSet]): 
AttributeMap[AttributeSet] = {
     val mergedRelationColumnLineage = {
       relationOutput.foldLeft((ListMap[Attribute, AttributeSet](), 
relationColumnLineage)) {
+        case ((acc, x), attr) if x.isEmpty =>
+          (acc + (attr -> AttributeSet.empty), x.empty)
         case ((acc, x), attr) =>
           (acc + (attr -> x.head._2), x.tail)
       }._1
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index 7612743798..9d8aee23ff 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -59,10 +59,16 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
       "  partitioned by(pid)")
     spark.sql("create table if not exists test_db0.test_table1" +
       " (key int, value string) using parquet")
+    spark.sql("create table test_db.test_table_from_dir" +
+      " (`a0` string, `b0` string) using parquet")
   }
 
   override def afterAll(): Unit = {
-    Seq("test_db0.test_table0", "test_db0.test_table1", 
"test_db0.test_table_part0").foreach { t =>
+    Seq(
+      "test_db0.test_table0",
+      "test_db0.test_table1",
+      "test_db0.test_table_part0",
+      "test_db.test_table_from_dir").foreach { t =>
       spark.sql(s"drop table if exists $t")
     }
     spark.sql("drop database if exists test_db")
@@ -1442,6 +1448,34 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
     }
   }
 
+  test("test directory to table") {
+    val inputFile = getClass.getResource("/").getPath + "input_file"
+    val sourceFile = File(inputFile).createFile()
+    spark.sql(
+      s"""
+         |CREATE OR REPLACE TEMPORARY VIEW temp_view (
+         | `a` STRING COMMENT '',
+         | `b` STRING COMMENT ''
+         |) USING csv OPTIONS(
+         |  sep='\t',
+         |  path='${sourceFile.path}'
+         |);
+         |""".stripMargin).collect()
+
+    val ret0 = extractLineageWithoutExecuting(
+      s"""
+         |INSERT OVERWRITE TABLE test_db.test_table_from_dir
+         |SELECT `a`, `b` FROM temp_view
+         |""".stripMargin)
+
+    assert(ret0 == Lineage(
+      List(),
+      List(s"spark_catalog.test_db.test_table_from_dir"),
+      List(
+        (s"spark_catalog.test_db.test_table_from_dir.a0", Set()),
+        (s"spark_catalog.test_db.test_table_from_dir.b0", Set()))))
+  }
+
   private def extractLineageWithoutExecuting(sql: String): Lineage = {
     val parsed = spark.sessionState.sqlParser.parsePlan(sql)
     val analyzed = spark.sessionState.analyzer.execute(parsed)

Reply via email to