This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new fdfb8f6d6 [KYUUBI #5170] Identifier compatible with spark3.4
fdfb8f6d6 is described below

commit fdfb8f6d66fe442ad86abeb22f44569054b6a9d7
Author: odone <[email protected]>
AuthorDate: Wed Aug 16 16:07:54 2023 +0800

    [KYUUBI #5170] Identifier compatible with spark3.4
    
    ### _Why are the changes needed?_
    
    close #5170
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No
    
    Closes #5171 from iodone/kyuubi-5170.
    
    Closes #5170
    
    7cd0e2cbf [odone] identifier compatible with sprk3.4
    
    Authored-by: odone <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/extensions/engines/spark/lineage.md                      |  7 ++++---
 .../plugin/lineage/helper/SparkSQLLineageParseHelper.scala    | 11 ++++++++---
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/docs/extensions/engines/spark/lineage.md 
b/docs/extensions/engines/spark/lineage.md
index 01acd884d..2dbb2a026 100644
--- a/docs/extensions/engines/spark/lineage.md
+++ b/docs/extensions/engines/spark/lineage.md
@@ -45,14 +45,14 @@ The lineage of this SQL:
 
 ```json
 {
-   "inputTables": ["default.test_table0"],
+   "inputTables": ["spark_catalog.default.test_table0"],
    "outputTables": [],
    "columnLineage": [{
       "column": "col0",
-      "originalColumns": ["default.test_table0.a"]
+      "originalColumns": ["spark_catalog.default.test_table0.a"]
    }, {
       "column": "col1",
-      "originalColumns": ["default.test_table0.b"]
+      "originalColumns": ["spark_catalog.default.test_table0.b"]
    }]
 }
 ```
@@ -125,6 +125,7 @@ The available `spark.version`s are shown in the following 
table.
 | Spark Version | Supported | Remark |
 |:-------------:|:---------:|:------:|
 |    master     |     √     |   -    |
+|     3.4.x     |     √     |   -    |
 |     3.3.x     |     √     |   -    |
 |     3.2.x     |     √     |   -    |
 |     3.1.x     |     √     |   -    |
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index dad37eac5..ab669aa19 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -53,7 +53,7 @@ trait LineageParser {
     val columnsLineage =
       extractColumnsLineage(plan, ListMap[Attribute, 
AttributeSet]()).toList.collect {
         case (k, attrs) =>
-          k.name -> attrs.map(_.qualifiedName).toSet
+          k.name -> attrs.map(attr => (attr.qualifier :+ 
attr.name).mkString(".")).toSet
       }
     val (inputTables, outputTables) = columnsLineage.foldLeft((List[String](), 
List[String]())) {
       case ((inputs, outputs), (out, in)) =>
@@ -324,7 +324,8 @@ trait LineageParser {
           nextColumnsLlineage.map { case (k, _) => (k, AttributeSet(k)) })
         val sourceColumnsLineage = extractColumnsLineage(sourceTable, 
nextColumnsLlineage)
         val targetColumnsWithTargetTable = 
targetColumnsLineage.values.flatten.map { column =>
-          column.withName(s"${column.qualifiedName}")
+          val unquotedQualifiedName = (column.qualifier :+ 
column.name).mkString(".")
+          column.withName(unquotedQualifiedName)
         }
         
ListMap(targetColumnsWithTargetTable.zip(sourceColumnsLineage.values).toSeq: _*)
 
@@ -497,7 +498,11 @@ trait LineageParser {
   }
 
   private def getV1TableName(qualifiedName: String): String = {
-    Seq(LineageConf.DEFAULT_CATALOG, 
qualifiedName).filter(_.nonEmpty).mkString(".")
+    qualifiedName.split("\\.") match {
+      case Array(database, table) =>
+        Seq(LineageConf.DEFAULT_CATALOG, database, 
table).filter(_.nonEmpty).mkString(".")
+      case _ => qualifiedName
+    }
   }
 }
 

Reply via email to