This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new abaa3698c [KYUUBI #5447][AUTHZ] Support Hudi 
DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand
abaa3698c is described below

commit abaa3698cb49a17a931ee8c4c6ab24e0dddf95e8
Author: Angerszhuuuu <[email protected]>
AuthorDate: Sat Oct 21 22:46:26 2023 +0800

    [KYUUBI #5447][AUTHZ] Support Hudi 
DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand
    
    ### _Why are the changes needed?_
    To close #5447. Kyuubi authz Support hudi 
DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand
    
    - DeleteHoodieTableCommand: 
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
    - UpdateHoodieTableCommand: 
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
    - MergeIntoHoodieTableCommand: 
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    No
    
    Closes #5482 from AngersZhuuuu/KYUUBI-5447.
    
    Closes #5447
    
    2598af203 [Angerszhuuuu] Update HudiCatalogRangerSparkExtensionSuite.scala
    08be589b7 [Angerszhuuuu] Update 
org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor
    19497d12c [Angerszhuuuu] Update tableExtractors.scala
    df6e244e2 [Angerszhuuuu] update
    1a72f1323 [Angerszhuuuu] update
    f7ca6846c [Angerszhuuuu] Merge branch 'master' into KYUUBI-5447
    37006869b [Angerszhuuuu] [KYUUBI #5447][AUTHZ] Support hudi 
DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: liangbowen <[email protected]>
---
 ....kyuubi.plugin.spark.authz.serde.QueryExtractor |  1 +
 ....kyuubi.plugin.spark.authz.serde.TableExtractor |  2 +
 .../src/main/resources/table_command_spec.json     | 63 ++++++++++++++++++++
 .../plugin/spark/authz/serde/queryExtractors.scala |  8 +++
 .../plugin/spark/authz/serde/tableExtractors.scala | 32 ++++++++++-
 .../plugin/spark/authz/gen/HudiCommands.scala      | 40 ++++++++++++-
 .../HudiCatalogRangerSparkExtensionSuite.scala     | 67 +++++++++++++++++++++-
 7 files changed, 209 insertions(+), 4 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor
index c659114f9..2406a40e1 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor
@@ -15,5 +15,6 @@
 # limitations under the License.
 #
 
+org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoSourceTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanOptionQueryExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanQueryExtractor
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
index 78f836c65..33c8b8759 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
@@ -19,6 +19,8 @@ 
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableOptionTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.ExpressionSeqTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.HudiDataSourceV2RelationTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoTargetTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index c739fe295..1d2b5dc88 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -1604,6 +1604,27 @@
   } ],
   "opType" : "CREATETABLE",
   "queryDescs" : [ ]
+}, {
+  "classname" : "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand",
+  "tableDescs" : [ {
+    "fieldName" : "dft",
+    "fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE"
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false
+  } ],
+  "opType" : "QUERY",
+  "queryDescs" : [ {
+    "fieldName" : "query",
+    "fieldExtractor" : "LogicalPlanQueryExtractor"
+  } ]
 }, {
   "classname" : "org.apache.spark.sql.hudi.command.DropHoodieTableCommand",
   "tableDescs" : [ {
@@ -1643,6 +1664,27 @@
     "fieldName" : "query",
     "fieldExtractor" : "LogicalPlanQueryExtractor"
   } ]
+}, {
+  "classname" : 
"org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand",
+  "tableDescs" : [ {
+    "fieldName" : "mergeInto",
+    "fieldExtractor" : "HudiMergeIntoTargetTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE"
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false
+  } ],
+  "opType" : "QUERY",
+  "queryDescs" : [ {
+    "fieldName" : "mergeInto",
+    "fieldExtractor" : "HudiMergeIntoSourceTableExtractor"
+  } ]
 }, {
   "classname" : "org.apache.spark.sql.hudi.command.RepairHoodieTableCommand",
   "tableDescs" : [ {
@@ -1705,4 +1747,25 @@
   } ],
   "opType" : "TRUNCATETABLE",
   "queryDescs" : [ ]
+}, {
+  "classname" : "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand",
+  "tableDescs" : [ {
+    "fieldName" : "ut",
+    "fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE"
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false
+  } ],
+  "opType" : "QUERY",
+  "queryDescs" : [ {
+    "fieldName" : "query",
+    "fieldExtractor" : "LogicalPlanQueryExtractor"
+  } ]
 } ]
\ No newline at end of file
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala
index f6fc19ac2..4ac87e100 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala
@@ -19,6 +19,8 @@ package org.apache.kyuubi.plugin.spark.authz.serde
 
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
+import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
+
 trait QueryExtractor extends (AnyRef => Option[LogicalPlan]) with Extractor
 
 object QueryExtractor {
@@ -44,3 +46,9 @@ class LogicalPlanOptionQueryExtractor extends QueryExtractor {
     v1.asInstanceOf[Option[LogicalPlan]]
   }
 }
+
+class HudiMergeIntoSourceTableExtractor extends QueryExtractor {
+  override def apply(v1: AnyRef): Option[LogicalPlan] = {
+    new LogicalPlanQueryExtractor().apply(invokeAs[LogicalPlan](v1, 
"sourceTable"))
+  }
+}
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index 57eab9634..47c486af3 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 
 import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
 import org.apache.kyuubi.util.reflect.ReflectUtils._
@@ -80,7 +80,9 @@ class TableIdentifierTableExtractor extends TableExtractor {
         val catalogTable = 
spark.sessionState.catalog.getTableMetadata(identifier)
         Option(catalogTable.owner).filter(_.nonEmpty)
       } catch {
-        case _: Exception => None
+        case e: Exception =>
+          e.printStackTrace()
+          None
       }
     Some(Table(None, identifier.database, identifier.table, owner))
   }
@@ -240,3 +242,29 @@ class TableTableExtractor extends TableExtractor {
     lookupExtractor[StringTableExtractor].apply(spark, tableName)
   }
 }
+
+class HudiDataSourceV2RelationTableExtractor extends TableExtractor {
+  override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+    invokeAs[LogicalPlan](v1, "table") match {
+      // Match multipartIdentifier with tableAlias
+      case SubqueryAlias(_, SubqueryAlias(identifier, _)) =>
+        new StringTableExtractor().apply(spark, identifier.toString())
+      // Match multipartIdentifier without tableAlias
+      case SubqueryAlias(identifier, _) =>
+        new StringTableExtractor().apply(spark, identifier.toString())
+    }
+  }
+}
+
+class HudiMergeIntoTargetTableExtractor extends TableExtractor {
+  override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+    invokeAs[LogicalPlan](v1, "targetTable") match {
+      // Match multipartIdentifier with tableAlias
+      case SubqueryAlias(_, SubqueryAlias(identifier, relation)) =>
+        new StringTableExtractor().apply(spark, identifier.toString())
+      // Match multipartIdentifier without tableAlias
+      case SubqueryAlias(identifier, _) =>
+        new StringTableExtractor().apply(spark, identifier.toString())
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
index d7e40237b..522059f27 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.plugin.spark.authz.gen
 
 import org.apache.kyuubi.plugin.spark.authz.OperationType._
+import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
 import org.apache.kyuubi.plugin.spark.authz.serde._
 import org.apache.kyuubi.plugin.spark.authz.serde.TableType._
 
@@ -165,6 +166,40 @@ object HudiCommands {
     TableCommandSpec(cmd, Seq(tableDesc), SHOWPARTITIONS)
   }
 
+  val DeleteHoodieTableCommand = {
+    val cmd = "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand"
+    val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+    val tableDesc =
+      TableDesc(
+        "dft",
+        classOf[HudiDataSourceV2RelationTableExtractor],
+        actionTypeDesc = Some(actionTypeDesc))
+    TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
+  }
+
+  val UpdateHoodieTableCommand = {
+    val cmd = "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand"
+    val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+    val tableDesc =
+      TableDesc(
+        "ut",
+        classOf[HudiDataSourceV2RelationTableExtractor],
+        actionTypeDesc = Some(actionTypeDesc))
+    TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
+  }
+
+  val MergeIntoHoodieTableCommand = {
+    val cmd = "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand"
+    val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+    val tableDesc =
+      TableDesc(
+        "mergeInto",
+        classOf[HudiMergeIntoTargetTableExtractor],
+        actionTypeDesc = Some(actionTypeDesc))
+    val queryDescs = QueryDesc("mergeInto", 
classOf[HudiMergeIntoSourceTableExtractor])
+    TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDescs))
+  }
+
   val data: Array[TableCommandSpec] = Array(
     AlterHoodieTableAddColumnsCommand,
     AlterHoodieTableChangeColumnCommand,
@@ -176,10 +211,13 @@ object HudiCommands {
     CreateHoodieTableLikeCommand,
     CompactionHoodieTableCommand,
     CompactionShowHoodieTableCommand,
+    DeleteHoodieTableCommand,
     DropHoodieTableCommand,
     InsertIntoHoodieTableCommand,
+    MergeIntoHoodieTableCommand,
     RepairHoodieTableCommand,
     TruncateHoodieTableCommand,
     ShowHoodieTablePartitionsCommand,
-    Spark31AlterTableCommand)
+    Spark31AlterTableCommand,
+    UpdateHoodieTableCommand)
 }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
index 193446bb2..fd7acd129 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
@@ -33,7 +33,7 @@ import org.apache.kyuubi.util.AssertionUtils.interceptContains
  */
 @HudiTest
 class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
-  override protected val catalogImpl: String = "hive"
+  override protected val catalogImpl: String = "in-memory"
   // TODO: Apache Hudi not support Spark 3.5 and Scala 2.13 yet,
   //  should change after Apache Hudi support Spark 3.5 and Scala 2.13.
   private def isSupportedVersion = !isSparkV35OrGreater && !isScalaV213
@@ -407,4 +407,69 @@ class HudiCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
       }
     }
   }
+
+  
test("DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand")
 {
+    withSingleCallEnabled {
+      withCleanTmpResources(Seq(
+        (s"$namespace1.$table1", "table"),
+        (s"$namespace1.$table2", "table"),
+        (namespace1, "database"))) {
+        doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+        doAs(
+          admin,
+          sql(
+            s"""
+               |CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name 
string, city string)
+               |USING HUDI
+               |OPTIONS (
+               | type = 'cow',
+               | primaryKey = 'id',
+               | 'hoodie.datasource.hive_sync.enable' = 'false'
+               |)
+               |PARTITIONED BY(city)
+               |""".stripMargin))
+
+        doAs(
+          admin,
+          sql(
+            s"""
+               |CREATE TABLE IF NOT EXISTS $namespace1.$table2(id int, name 
string, city string)
+               |USING HUDI
+               |OPTIONS (
+               | type = 'cow',
+               | primaryKey = 'id',
+               | 'hoodie.datasource.hive_sync.enable' = 'false'
+               |)
+               |PARTITIONED BY(city)
+               |""".stripMargin))
+
+        val deleteFrom = s"DELETE FROM $namespace1.$table1 WHERE id = 10"
+        interceptContains[AccessControlException] {
+          doAs(someone, sql(deleteFrom))
+        }(s"does not have [update] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(deleteFrom))
+
+        val updateSql = s"UPDATE $namespace1.$table1 SET name = 'test' WHERE 
id > 10"
+        interceptContains[AccessControlException] {
+          doAs(someone, sql(updateSql))
+        }(s"does not have [update] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(updateSql))
+
+        val mergeIntoSQL =
+          s"""
+             |MERGE INTO $namespace1.$table1 target
+             |USING $namespace1.$table2 source
+             |ON target.id = source.id
+             |WHEN MATCHED
+             |AND target.name == 'test'
+             | THEN UPDATE SET id = source.id, name = source.name, city = 
source.city
+             |""".stripMargin
+        interceptContains[AccessControlException] {
+          doAs(someone, sql(mergeIntoSQL))
+        }(s"does not have [select] privilege on " +
+          
s"[$namespace1/$table2/id,$namespace1/$table2/name,$namespace1/$table2/city]")
+        doAs(admin, sql(mergeIntoSQL))
+      }
+    }
+  }
 }

Reply via email to