This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5185c8d22 [KYUUBI #5533][AUTHZ] Support merge into table command for
Delta Lake
5185c8d22 is described below
commit 5185c8d229a08d646914ffed89b13b4f01f5767f
Author: zml1206 <[email protected]>
AuthorDate: Mon Nov 6 17:37:23 2023 +0800
[KYUUBI #5533][AUTHZ] Support merge into table command for Delta Lake
### _Why are the changes needed?_
To close #5533 .
Support merge into table command for Delta Lake.
https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5621 from zml1206/KYUUBI-5533.
Closes #5533
71af24a4f [zml1206] Support merge into table command for Delta Lake
Authored-by: zml1206 <[email protected]>
Signed-off-by: Bowen Liang <[email protected]>
---
.../src/main/resources/table_command_spec.json | 22 +++++++++++
.../plugin/spark/authz/gen/DeltaCommands.scala | 12 ++++++
.../DeltaCatalogRangerSparkExtensionSuite.scala | 45 ++++++++++++++++++++++
3 files changed, 79 insertions(+)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 9a1c0c34c..b36bce824 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -2029,6 +2029,28 @@
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
+}, {
+ "classname" : "org.apache.spark.sql.delta.commands.MergeIntoCommand",
+ "tableDescs" : [ {
+ "fieldName" : "target",
+ "fieldExtractor" : "SubqueryAliasTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE"
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false
+ } ],
+ "opType" : "QUERY",
+ "queryDescs" : [ {
+ "fieldName" : "source",
+ "fieldExtractor" : "LogicalPlanQueryExtractor"
+ } ],
+ "uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.delta.commands.UpdateCommand",
"tableDescs" : [ {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
index e8cce67ab..38701760d 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
@@ -42,7 +42,19 @@ object DeltaCommands extends CommandSpecs[TableCommandSpec] {
DeleteCommand.copy(classname = cmd)
}
+ val MergeIntoCommand = {
+ val cmd = "org.apache.spark.sql.delta.commands.MergeIntoCommand"
+ val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+ val tableDesc = TableDesc(
+ "target",
+ classOf[SubqueryAliasTableExtractor],
+ actionTypeDesc = Some(actionTypeDesc))
+ val queryDesc = QueryDesc("source")
+ TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDesc))
+ }
+
override def specs: Seq[TableCommandSpec] = Seq(
DeleteCommand,
+ MergeIntoCommand,
UpdateCommand)
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
index 4fc73adce..04d2262df 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
@@ -221,6 +221,51 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
doAs(admin, sql(updateTableSql))
}
}
+
+ test("merge into table") {
+ withSingleCallEnabled {
+ withCleanTmpResources(Seq(
+ (s"$namespace1.$table1", "table"),
+ (s"$namespace1.$table2", "table"),
+ (s"$namespace1", "database"))) {
+ doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+ doAs(admin, sql(createTableSql(namespace1, table1)))
+ doAs(admin, sql(createTableSql(namespace1, table2)))
+
+ val mergeIntoSql =
+ s"""
+ |MERGE INTO $namespace1.$table1 AS target
+ |USING $namespace1.$table2 AS source
+ |ON target.id = source.id
+ |WHEN MATCHED THEN
+ | UPDATE SET
+ | id = source.id,
+ | name = source.name,
+ | gender = source.gender,
+ | birthDate = source.birthDate
+ |WHEN NOT MATCHED
+ | THEN INSERT (
+ | id,
+ | name,
+ | gender,
+ | birthDate
+ | )
+ | VALUES (
+ | source.id,
+ | source.name,
+ | source.gender,
+ | source.birthDate
+ | )
+ |""".stripMargin
+ interceptContains[AccessControlException](
+ doAs(someone, sql(mergeIntoSql)))(
+ s"does not have [select] privilege on
[$namespace1/$table2/id,$namespace1/$table2/name," +
+ s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+ s" [update] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(mergeIntoSql))
+ }
+ }
+ }
}
object DeltaCatalogRangerSparkExtensionSuite {