This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 5802a163c [KYUUBI #4331] [KYUUBI #4431] Lineage supports `Union` and
`MergeInto`
5802a163c is described below
commit 5802a163c120441d49b10fdbbae8e922c5c16977
Author: odone <[email protected]>
AuthorDate: Fri Mar 17 13:13:16 2023 +0800
[KYUUBI #4331] [KYUUBI #4431] Lineage supports `Union` and `MergeInto`
close #4331
close #4431
### _Why are the changes needed?_
Because `optimizedPlan` has been replaced by `analyzedPlan`, iceberg tables
do not need to be additionally processed when new logical plans are generated
during some optimization stages. They can be treated as regular tables.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4543 from iodone/fix-4431.
Closes #4331
Closes #4431
2adddcc5 [odone] fix `Union` and `MergeInto` bug
Authored-by: odone <[email protected]>
Signed-off-by: ulyssesyou <[email protected]>
(cherry picked from commit 04155f2d690552a038980d022363ef1d7c36fc5c)
Signed-off-by: ulyssesyou <[email protected]>
---
.../helper/SparkSQLLineageParseHelper.scala | 24 ++++++++++++++--------
.../helper/SparkSQLLineageParserHelperSuite.scala | 21 ++++++++++++++++++-
2 files changed, 36 insertions(+), 9 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index 1224da023..f78910aed 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -311,7 +311,7 @@ trait LineageParser {
val nextColumnsLlineage = ListMap(allAssignments.map { assignment =>
(
assignment.key.asInstanceOf[Attribute],
- AttributeSet(assignment.value.asInstanceOf[Attribute]))
+ assignment.value.references)
}: _*)
val targetTable = getPlanField[LogicalPlan]("targetTable", plan)
val sourceTable = getPlanField[LogicalPlan]("sourceTable", plan)
@@ -370,14 +370,22 @@ trait LineageParser {
}
case p: Union =>
- // merge all children in to one derivedColumns
- val childrenUnion =
- p.children.map(extractColumnsLineage(_, ListMap[Attribute,
AttributeSet]())).map(
- _.values).reduce {
- (left, right) =>
- left.zip(right).map(attr => attr._1 ++ attr._2)
+ val childrenColumnsLineage =
+ // support for the multi-insert statement
+ if (p.output.isEmpty) {
+ p.children
+ .map(extractColumnsLineage(_, ListMap[Attribute,
AttributeSet]()))
+ .reduce(mergeColumnsLineage)
+ } else {
+ // merge all children in to one derivedColumns
+ val childrenUnion =
+ p.children.map(extractColumnsLineage(_, ListMap[Attribute,
AttributeSet]())).map(
+ _.values).reduce {
+ (left, right) =>
+ left.zip(right).map(attr => attr._1 ++ attr._2)
+ }
+ ListMap(p.output.zip(childrenUnion): _*)
}
- val childrenColumnsLineage = ListMap(p.output.zip(childrenUnion): _*)
joinColumnsLineage(parentColumnsLineage, childrenColumnsLineage)
case p: LogicalRelation if p.catalogTable.nonEmpty =>
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index 6180980c8..e94c88f6b 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -171,7 +171,7 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
"WHEN MATCHED THEN " +
" UPDATE SET target.name = source.name, target.price = source.price "
+
"WHEN NOT MATCHED THEN " +
- " INSERT (id, name, price) VALUES (source.id, source.name,
source.price)")
+ " INSERT (id, name, price) VALUES (cast(source.id as int),
source.name, source.price)")
assert(ret0 == Lineage(
List("v2_catalog.db.source_t"),
List("v2_catalog.db.target_t"),
@@ -1249,6 +1249,25 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
}
}
+ test("test the statement with FROM xxx INSERT xxx") {
+ withTable("t1", "t2", "t3") { _ =>
+ spark.sql("CREATE TABLE t1 (a string, b string) USING hive")
+ spark.sql("CREATE TABLE t2 (a string, b string) USING hive")
+ spark.sql("CREATE TABLE t3 (a string, b string) USING hive")
+ val ret0 = exectractLineage("from (select a,b from t1)" +
+ " insert overwrite table t2 select a,b where a=1" +
+ " insert overwrite table t3 select a,b where b=1")
+ assert(ret0 == Lineage(
+ List("default.t1"),
+ List("default.t2", "default.t3"),
+ List(
+ ("default.t2.a", Set("default.t1.a")),
+ ("default.t2.b", Set("default.t1.b")),
+ ("default.t3.a", Set("default.t1.a")),
+ ("default.t3.b", Set("default.t1.b")))))
+ }
+ }
+
private def exectractLineageWithoutExecuting(sql: String): Lineage = {
val parsed = spark.sessionState.sqlParser.parsePlan(sql)
val analyzed = spark.sessionState.analyzer.execute(parsed)