This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new abaa3698c [KYUUBI #5447][AUTHZ] Support Hudi
DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand
abaa3698c is described below
commit abaa3698cb49a17a931ee8c4c6ab24e0dddf95e8
Author: Angerszhuuuu <[email protected]>
AuthorDate: Sat Oct 21 22:46:26 2023 +0800
[KYUUBI #5447][AUTHZ] Support Hudi
DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand
### _Why are the changes needed?_
To close #5447. Kyuubi authz Support hudi
DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand
- DeleteHoodieTableCommand:
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
- UpdateHoodieTableCommand:
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
- MergeIntoHoodieTableCommand:
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No
Closes #5482 from AngersZhuuuu/KYUUBI-5447.
Closes #5447
2598af203 [Angerszhuuuu] Update HudiCatalogRangerSparkExtensionSuite.scala
08be589b7 [Angerszhuuuu] Update
org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor
19497d12c [Angerszhuuuu] Update tableExtractors.scala
df6e244e2 [Angerszhuuuu] update
1a72f1323 [Angerszhuuuu] update
f7ca6846c [Angerszhuuuu] Merge branch 'master' into KYUUBI-5447
37006869b [Angerszhuuuu] [KYUUBI #5447][AUTHZ] Support hudi
DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: liangbowen <[email protected]>
---
....kyuubi.plugin.spark.authz.serde.QueryExtractor | 1 +
....kyuubi.plugin.spark.authz.serde.TableExtractor | 2 +
.../src/main/resources/table_command_spec.json | 63 ++++++++++++++++++++
.../plugin/spark/authz/serde/queryExtractors.scala | 8 +++
.../plugin/spark/authz/serde/tableExtractors.scala | 32 ++++++++++-
.../plugin/spark/authz/gen/HudiCommands.scala | 40 ++++++++++++-
.../HudiCatalogRangerSparkExtensionSuite.scala | 67 +++++++++++++++++++++-
7 files changed, 209 insertions(+), 4 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor
index c659114f9..2406a40e1 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor
@@ -15,5 +15,6 @@
# limitations under the License.
#
+org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoSourceTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanOptionQueryExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanQueryExtractor
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
index 78f836c65..33c8b8759 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
@@ -19,6 +19,8 @@
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableOptionTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ExpressionSeqTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.HudiDataSourceV2RelationTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoTargetTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index c739fe295..1d2b5dc88 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -1604,6 +1604,27 @@
} ],
"opType" : "CREATETABLE",
"queryDescs" : [ ]
+}, {
+ "classname" : "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand",
+ "tableDescs" : [ {
+ "fieldName" : "dft",
+ "fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE"
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false
+ } ],
+ "opType" : "QUERY",
+ "queryDescs" : [ {
+ "fieldName" : "query",
+ "fieldExtractor" : "LogicalPlanQueryExtractor"
+ } ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.DropHoodieTableCommand",
"tableDescs" : [ {
@@ -1643,6 +1664,27 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
+}, {
+ "classname" :
"org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand",
+ "tableDescs" : [ {
+ "fieldName" : "mergeInto",
+ "fieldExtractor" : "HudiMergeIntoTargetTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE"
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false
+ } ],
+ "opType" : "QUERY",
+ "queryDescs" : [ {
+ "fieldName" : "mergeInto",
+ "fieldExtractor" : "HudiMergeIntoSourceTableExtractor"
+ } ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.RepairHoodieTableCommand",
"tableDescs" : [ {
@@ -1705,4 +1747,25 @@
} ],
"opType" : "TRUNCATETABLE",
"queryDescs" : [ ]
+}, {
+ "classname" : "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand",
+ "tableDescs" : [ {
+ "fieldName" : "ut",
+ "fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE"
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false
+ } ],
+ "opType" : "QUERY",
+ "queryDescs" : [ {
+ "fieldName" : "query",
+ "fieldExtractor" : "LogicalPlanQueryExtractor"
+ } ]
} ]
\ No newline at end of file
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala
index f6fc19ac2..4ac87e100 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala
@@ -19,6 +19,8 @@ package org.apache.kyuubi.plugin.spark.authz.serde
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
+
trait QueryExtractor extends (AnyRef => Option[LogicalPlan]) with Extractor
object QueryExtractor {
@@ -44,3 +46,9 @@ class LogicalPlanOptionQueryExtractor extends QueryExtractor {
v1.asInstanceOf[Option[LogicalPlan]]
}
}
+
+class HudiMergeIntoSourceTableExtractor extends QueryExtractor {
+ override def apply(v1: AnyRef): Option[LogicalPlan] = {
+ new LogicalPlanQueryExtractor().apply(invokeAs[LogicalPlan](v1,
"sourceTable"))
+ }
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index 57eab9634..47c486af3 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.util.reflect.ReflectUtils._
@@ -80,7 +80,9 @@ class TableIdentifierTableExtractor extends TableExtractor {
val catalogTable =
spark.sessionState.catalog.getTableMetadata(identifier)
Option(catalogTable.owner).filter(_.nonEmpty)
} catch {
- case _: Exception => None
+ case e: Exception =>
+ e.printStackTrace()
+ None
}
Some(Table(None, identifier.database, identifier.table, owner))
}
@@ -240,3 +242,29 @@ class TableTableExtractor extends TableExtractor {
lookupExtractor[StringTableExtractor].apply(spark, tableName)
}
}
+
+class HudiDataSourceV2RelationTableExtractor extends TableExtractor {
+ override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+ invokeAs[LogicalPlan](v1, "table") match {
+ // Match multipartIdentifier with tableAlias
+ case SubqueryAlias(_, SubqueryAlias(identifier, _)) =>
+ new StringTableExtractor().apply(spark, identifier.toString())
+ // Match multipartIdentifier without tableAlias
+ case SubqueryAlias(identifier, _) =>
+ new StringTableExtractor().apply(spark, identifier.toString())
+ }
+ }
+}
+
+class HudiMergeIntoTargetTableExtractor extends TableExtractor {
+ override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+ invokeAs[LogicalPlan](v1, "targetTable") match {
+ // Match multipartIdentifier with tableAlias
+ case SubqueryAlias(_, SubqueryAlias(identifier, relation)) =>
+ new StringTableExtractor().apply(spark, identifier.toString())
+ // Match multipartIdentifier without tableAlias
+ case SubqueryAlias(identifier, _) =>
+ new StringTableExtractor().apply(spark, identifier.toString())
+ }
+ }
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
index d7e40237b..522059f27 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.plugin.spark.authz.gen
import org.apache.kyuubi.plugin.spark.authz.OperationType._
+import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
import org.apache.kyuubi.plugin.spark.authz.serde._
import org.apache.kyuubi.plugin.spark.authz.serde.TableType._
@@ -165,6 +166,40 @@ object HudiCommands {
TableCommandSpec(cmd, Seq(tableDesc), SHOWPARTITIONS)
}
+ val DeleteHoodieTableCommand = {
+ val cmd = "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand"
+ val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+ val tableDesc =
+ TableDesc(
+ "dft",
+ classOf[HudiDataSourceV2RelationTableExtractor],
+ actionTypeDesc = Some(actionTypeDesc))
+ TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
+ }
+
+ val UpdateHoodieTableCommand = {
+ val cmd = "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand"
+ val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+ val tableDesc =
+ TableDesc(
+ "ut",
+ classOf[HudiDataSourceV2RelationTableExtractor],
+ actionTypeDesc = Some(actionTypeDesc))
+ TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
+ }
+
+ val MergeIntoHoodieTableCommand = {
+ val cmd = "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand"
+ val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+ val tableDesc =
+ TableDesc(
+ "mergeInto",
+ classOf[HudiMergeIntoTargetTableExtractor],
+ actionTypeDesc = Some(actionTypeDesc))
+ val queryDescs = QueryDesc("mergeInto",
classOf[HudiMergeIntoSourceTableExtractor])
+ TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDescs))
+ }
+
val data: Array[TableCommandSpec] = Array(
AlterHoodieTableAddColumnsCommand,
AlterHoodieTableChangeColumnCommand,
@@ -176,10 +211,13 @@ object HudiCommands {
CreateHoodieTableLikeCommand,
CompactionHoodieTableCommand,
CompactionShowHoodieTableCommand,
+ DeleteHoodieTableCommand,
DropHoodieTableCommand,
InsertIntoHoodieTableCommand,
+ MergeIntoHoodieTableCommand,
RepairHoodieTableCommand,
TruncateHoodieTableCommand,
ShowHoodieTablePartitionsCommand,
- Spark31AlterTableCommand)
+ Spark31AlterTableCommand,
+ UpdateHoodieTableCommand)
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
index 193446bb2..fd7acd129 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
@@ -33,7 +33,7 @@ import org.apache.kyuubi.util.AssertionUtils.interceptContains
*/
@HudiTest
class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
- override protected val catalogImpl: String = "hive"
+ override protected val catalogImpl: String = "in-memory"
// TODO: Apache Hudi not support Spark 3.5 and Scala 2.13 yet,
// should change after Apache Hudi support Spark 3.5 and Scala 2.13.
private def isSupportedVersion = !isSparkV35OrGreater && !isScalaV213
@@ -407,4 +407,69 @@ class HudiCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
}
}
+
+
test("DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand")
{
+ withSingleCallEnabled {
+ withCleanTmpResources(Seq(
+ (s"$namespace1.$table1", "table"),
+ (s"$namespace1.$table2", "table"),
+ (namespace1, "database"))) {
+ doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+ doAs(
+ admin,
+ sql(
+ s"""
+ |CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name
string, city string)
+ |USING HUDI
+ |OPTIONS (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | 'hoodie.datasource.hive_sync.enable' = 'false'
+ |)
+ |PARTITIONED BY(city)
+ |""".stripMargin))
+
+ doAs(
+ admin,
+ sql(
+ s"""
+ |CREATE TABLE IF NOT EXISTS $namespace1.$table2(id int, name
string, city string)
+ |USING HUDI
+ |OPTIONS (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | 'hoodie.datasource.hive_sync.enable' = 'false'
+ |)
+ |PARTITIONED BY(city)
+ |""".stripMargin))
+
+ val deleteFrom = s"DELETE FROM $namespace1.$table1 WHERE id = 10"
+ interceptContains[AccessControlException] {
+ doAs(someone, sql(deleteFrom))
+ }(s"does not have [update] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(deleteFrom))
+
+ val updateSql = s"UPDATE $namespace1.$table1 SET name = 'test' WHERE
id > 10"
+ interceptContains[AccessControlException] {
+ doAs(someone, sql(updateSql))
+ }(s"does not have [update] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(updateSql))
+
+ val mergeIntoSQL =
+ s"""
+ |MERGE INTO $namespace1.$table1 target
+ |USING $namespace1.$table2 source
+ |ON target.id = source.id
+ |WHEN MATCHED
+ |AND target.name == 'test'
+ | THEN UPDATE SET id = source.id, name = source.name, city =
source.city
+ |""".stripMargin
+ interceptContains[AccessControlException] {
+ doAs(someone, sql(mergeIntoSQL))
+ }(s"does not have [select] privilege on " +
+
s"[$namespace1/$table2/id,$namespace1/$table2/name,$namespace1/$table2/city]")
+ doAs(admin, sql(mergeIntoSQL))
+ }
+ }
+ }
}