This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5185c8d22 [KYUUBI #5533][AUTHZ] Support merge into table command for 
Delta Lake
5185c8d22 is described below

commit 5185c8d229a08d646914ffed89b13b4f01f5767f
Author: zml1206 <[email protected]>
AuthorDate: Mon Nov 6 17:37:23 2023 +0800

    [KYUUBI #5533][AUTHZ] Support merge into table command for Delta Lake
    
    ### _Why are the changes needed?_
    To close #5533 .
    Support merge into table command for Delta Lake.
    
https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    No.
    
    Closes #5621 from zml1206/KYUUBI-5533.
    
    Closes #5533
    
    71af24a4f [zml1206] Support merge into table command for Delta Lake
    
    Authored-by: zml1206 <[email protected]>
    Signed-off-by: Bowen Liang <[email protected]>
---
 .../src/main/resources/table_command_spec.json     | 22 +++++++++++
 .../plugin/spark/authz/gen/DeltaCommands.scala     | 12 ++++++
 .../DeltaCatalogRangerSparkExtensionSuite.scala    | 45 ++++++++++++++++++++++
 3 files changed, 79 insertions(+)

diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 9a1c0c34c..b36bce824 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -2029,6 +2029,28 @@
   "opType" : "QUERY",
   "queryDescs" : [ ],
   "uriDescs" : [ ]
+}, {
+  "classname" : "org.apache.spark.sql.delta.commands.MergeIntoCommand",
+  "tableDescs" : [ {
+    "fieldName" : "target",
+    "fieldExtractor" : "SubqueryAliasTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE"
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false
+  } ],
+  "opType" : "QUERY",
+  "queryDescs" : [ {
+    "fieldName" : "source",
+    "fieldExtractor" : "LogicalPlanQueryExtractor"
+  } ],
+  "uriDescs" : [ ]
 }, {
   "classname" : "org.apache.spark.sql.delta.commands.UpdateCommand",
   "tableDescs" : [ {
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
index e8cce67ab..38701760d 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
@@ -42,7 +42,19 @@ object DeltaCommands extends CommandSpecs[TableCommandSpec] {
     DeleteCommand.copy(classname = cmd)
   }
 
+  val MergeIntoCommand = {
+    val cmd = "org.apache.spark.sql.delta.commands.MergeIntoCommand"
+    val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+    val tableDesc = TableDesc(
+      "target",
+      classOf[SubqueryAliasTableExtractor],
+      actionTypeDesc = Some(actionTypeDesc))
+    val queryDesc = QueryDesc("source")
+    TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDesc))
+  }
+
   override def specs: Seq[TableCommandSpec] = Seq(
     DeleteCommand,
+    MergeIntoCommand,
     UpdateCommand)
 }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
index 4fc73adce..04d2262df 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
@@ -221,6 +221,51 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
       doAs(admin, sql(updateTableSql))
     }
   }
+
+  test("merge into table") {
+    withSingleCallEnabled {
+      withCleanTmpResources(Seq(
+        (s"$namespace1.$table1", "table"),
+        (s"$namespace1.$table2", "table"),
+        (s"$namespace1", "database"))) {
+        doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+        doAs(admin, sql(createTableSql(namespace1, table1)))
+        doAs(admin, sql(createTableSql(namespace1, table2)))
+
+        val mergeIntoSql =
+          s"""
+             |MERGE INTO $namespace1.$table1 AS target
+             |USING $namespace1.$table2 AS source
+             |ON target.id = source.id
+             |WHEN MATCHED THEN
+             |  UPDATE SET
+             |    id = source.id,
+             |    name = source.name,
+             |    gender = source.gender,
+             |    birthDate = source.birthDate
+             |WHEN NOT MATCHED
+             |  THEN INSERT (
+             |    id,
+             |    name,
+             |    gender,
+             |    birthDate
+             |  )
+             |  VALUES (
+             |    source.id,
+             |    source.name,
+             |    source.gender,
+             |    source.birthDate
+             |  )
+             |""".stripMargin
+        interceptContains[AccessControlException](
+          doAs(someone, sql(mergeIntoSql)))(
+          s"does not have [select] privilege on 
[$namespace1/$table2/id,$namespace1/$table2/name," +
+            s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+            s" [update] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(mergeIntoSql))
+      }
+    }
+  }
 }
 
 object DeltaCatalogRangerSparkExtensionSuite {

Reply via email to