This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new fdfb8f6d6 [KYUUBI #5170] Identifier compatible with spark3.4
fdfb8f6d6 is described below
commit fdfb8f6d66fe442ad86abeb22f44569054b6a9d7
Author: odone <[email protected]>
AuthorDate: Wed Aug 16 16:07:54 2023 +0800
[KYUUBI #5170] Identifier compatible with spark3.4
### _Why are the changes needed?_
close #5170
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No
Closes #5171 from iodone/kyuubi-5170.
Closes #5170
7cd0e2cbf [odone] identifier compatible with sprk3.4
Authored-by: odone <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/extensions/engines/spark/lineage.md | 7 ++++---
.../plugin/lineage/helper/SparkSQLLineageParseHelper.scala | 11 ++++++++---
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/docs/extensions/engines/spark/lineage.md
b/docs/extensions/engines/spark/lineage.md
index 01acd884d..2dbb2a026 100644
--- a/docs/extensions/engines/spark/lineage.md
+++ b/docs/extensions/engines/spark/lineage.md
@@ -45,14 +45,14 @@ The lineage of this SQL:
```json
{
- "inputTables": ["default.test_table0"],
+ "inputTables": ["spark_catalog.default.test_table0"],
"outputTables": [],
"columnLineage": [{
"column": "col0",
- "originalColumns": ["default.test_table0.a"]
+ "originalColumns": ["spark_catalog.default.test_table0.a"]
}, {
"column": "col1",
- "originalColumns": ["default.test_table0.b"]
+ "originalColumns": ["spark_catalog.default.test_table0.b"]
}]
}
```
@@ -125,6 +125,7 @@ The available `spark.version`s are shown in the following
table.
| Spark Version | Supported | Remark |
|:-------------:|:---------:|:------:|
| master | √ | - |
+| 3.4.x | √ | - |
| 3.3.x | √ | - |
| 3.2.x | √ | - |
| 3.1.x | √ | - |
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index dad37eac5..ab669aa19 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -53,7 +53,7 @@ trait LineageParser {
val columnsLineage =
extractColumnsLineage(plan, ListMap[Attribute,
AttributeSet]()).toList.collect {
case (k, attrs) =>
- k.name -> attrs.map(_.qualifiedName).toSet
+ k.name -> attrs.map(attr => (attr.qualifier :+
attr.name).mkString(".")).toSet
}
val (inputTables, outputTables) = columnsLineage.foldLeft((List[String](),
List[String]())) {
case ((inputs, outputs), (out, in)) =>
@@ -324,7 +324,8 @@ trait LineageParser {
nextColumnsLlineage.map { case (k, _) => (k, AttributeSet(k)) })
val sourceColumnsLineage = extractColumnsLineage(sourceTable,
nextColumnsLlineage)
val targetColumnsWithTargetTable =
targetColumnsLineage.values.flatten.map { column =>
- column.withName(s"${column.qualifiedName}")
+ val unquotedQualifiedName = (column.qualifier :+
column.name).mkString(".")
+ column.withName(unquotedQualifiedName)
}
ListMap(targetColumnsWithTargetTable.zip(sourceColumnsLineage.values).toSeq: _*)
@@ -497,7 +498,11 @@ trait LineageParser {
}
private def getV1TableName(qualifiedName: String): String = {
- Seq(LineageConf.DEFAULT_CATALOG,
qualifiedName).filter(_.nonEmpty).mkString(".")
+ qualifiedName.split("\\.") match {
+ case Array(database, table) =>
+ Seq(LineageConf.DEFAULT_CATALOG, database,
table).filter(_.nonEmpty).mkString(".")
+ case _ => qualifiedName
+ }
}
}