This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 5802a163c [KYUUBI #4331] [KYUUBI #4431] Lineage supports `Union` and 
`MergeInto`
5802a163c is described below

commit 5802a163c120441d49b10fdbbae8e922c5c16977
Author: odone <[email protected]>
AuthorDate: Fri Mar 17 13:13:16 2023 +0800

    [KYUUBI #4331] [KYUUBI #4431] Lineage supports `Union` and `MergeInto`
    
    close #4331
    close #4431
    
    ### _Why are the changes needed?_
    
    Because `optimizedPlan` has been replaced by `analyzedPlan`, iceberg tables 
do not need to be additionally processed when new logical plans are generated 
during some optimization stages. They can be treated as regular tables.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4543 from iodone/fix-4431.
    
    Closes #4331
    
    Closes #4431
    
    2adddcc5 [odone] fix `Union` and `MergeInto` bug
    
    Authored-by: odone <[email protected]>
    Signed-off-by: ulyssesyou <[email protected]>
    (cherry picked from commit 04155f2d690552a038980d022363ef1d7c36fc5c)
    Signed-off-by: ulyssesyou <[email protected]>
---
 .../helper/SparkSQLLineageParseHelper.scala        | 24 ++++++++++++++--------
 .../helper/SparkSQLLineageParserHelperSuite.scala  | 21 ++++++++++++++++++-
 2 files changed, 36 insertions(+), 9 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index 1224da023..f78910aed 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -311,7 +311,7 @@ trait LineageParser {
         val nextColumnsLlineage = ListMap(allAssignments.map { assignment =>
           (
             assignment.key.asInstanceOf[Attribute],
-            AttributeSet(assignment.value.asInstanceOf[Attribute]))
+            assignment.value.references)
         }: _*)
         val targetTable = getPlanField[LogicalPlan]("targetTable", plan)
         val sourceTable = getPlanField[LogicalPlan]("sourceTable", plan)
@@ -370,14 +370,22 @@ trait LineageParser {
         }
 
       case p: Union =>
-        // merge all children in to one derivedColumns
-        val childrenUnion =
-          p.children.map(extractColumnsLineage(_, ListMap[Attribute, 
AttributeSet]())).map(
-            _.values).reduce {
-            (left, right) =>
-              left.zip(right).map(attr => attr._1 ++ attr._2)
+        val childrenColumnsLineage =
+          // support for the multi-insert statement
+          if (p.output.isEmpty) {
+            p.children
+              .map(extractColumnsLineage(_, ListMap[Attribute, 
AttributeSet]()))
+              .reduce(mergeColumnsLineage)
+          } else {
+            // merge all children in to one derivedColumns
+            val childrenUnion =
+              p.children.map(extractColumnsLineage(_, ListMap[Attribute, 
AttributeSet]())).map(
+                _.values).reduce {
+                (left, right) =>
+                  left.zip(right).map(attr => attr._1 ++ attr._2)
+              }
+            ListMap(p.output.zip(childrenUnion): _*)
           }
-        val childrenColumnsLineage = ListMap(p.output.zip(childrenUnion): _*)
         joinColumnsLineage(parentColumnsLineage, childrenColumnsLineage)
 
       case p: LogicalRelation if p.catalogTable.nonEmpty =>
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index 6180980c8..e94c88f6b 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -171,7 +171,7 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
         "WHEN MATCHED THEN " +
         "  UPDATE SET target.name = source.name, target.price = source.price " 
+
         "WHEN NOT MATCHED THEN " +
-        "  INSERT (id, name, price) VALUES (source.id, source.name, 
source.price)")
+        "  INSERT (id, name, price) VALUES (cast(source.id as int), 
source.name, source.price)")
       assert(ret0 == Lineage(
         List("v2_catalog.db.source_t"),
         List("v2_catalog.db.target_t"),
@@ -1249,6 +1249,25 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
     }
   }
 
+  test("test the statement with FROM xxx INSERT xxx") {
+    withTable("t1", "t2", "t3") { _ =>
+      spark.sql("CREATE TABLE t1 (a string, b string) USING hive")
+      spark.sql("CREATE TABLE t2 (a string, b string) USING hive")
+      spark.sql("CREATE TABLE t3 (a string, b string) USING hive")
+      val ret0 = exectractLineage("from (select a,b from t1)" +
+        " insert overwrite table t2 select a,b where a=1" +
+        " insert overwrite table t3 select a,b where b=1")
+      assert(ret0 == Lineage(
+        List("default.t1"),
+        List("default.t2", "default.t3"),
+        List(
+          ("default.t2.a", Set("default.t1.a")),
+          ("default.t2.b", Set("default.t1.b")),
+          ("default.t3.a", Set("default.t1.a")),
+          ("default.t3.b", Set("default.t1.b")))))
+    }
+  }
+
   private def exectractLineageWithoutExecuting(sql: String): Lineage = {
     val parsed = spark.sessionState.sqlParser.parsePlan(sql)
     val analyzed = spark.sessionState.analyzer.execute(parsed)

Reply via email to