This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new 3f9c6761e [KYUUBI #4916] [AUTHZ] Support ReplaceData and compatible
Spark 3.4 and 3.5
3f9c6761e is described below
commit 3f9c6761ebb0fd4428ae0fca9b2d0c4ad2bdcb9f
Author: Bowen Liang <[email protected]>
AuthorDate: Tue Oct 17 13:04:15 2023 +0800
[KYUUBI #4916] [AUTHZ] Support ReplaceData and compatible Spark 3.4 and 3.5
- Iceberg 1.3.0 use Spark's `ReplaceData` since 3.4 as Logical plan for
"DELETE FROM", instead of Iceberg's `DeleteFromTable` in Spark3.4
- Requiring select privilege for input table, even it's the same with
output table
- compatible Spark 3.4 and 3.5, subquies in the plan
- enable iceberg test for authz plugin on Spark 3.4 and 3.5
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4916 from bowenliang123/authz-replacedata.
Closes #4916
658917752 [Bowen Liang] Merge branch 'master' into authz-replacedata
17f5d84dd [liangbowen] update
25009908a [Bowen Liang] ut of user with select only for DELETE FROM
fa77cea15 [Bowen Liang] ut
fd9bb8f3a [Bowen Liang] update
f9cfb98a9 [Bowen Liang] assertion for Spark 3.5
f574e0da3 [Bowen Liang] assertion for Spark 3.5
2449c27de [Bowen Liang] bring back single call ut for someone
78786988d [Bowen Liang] bring back single call ut for someone
b8e4a6319 [wforget] fix style
0e26c08b4 [wforget] fix IcebergCatalogRangerSparkExtensionSuite with
spark-3.5
02781781f [wforget] enable iceberg tests in spark-3.5
215e1b861 [wforget] fix TableCommands
d019b1632 [wforget] followup
ae17e076b [wforget] Merge remote-tracking branch 'origin/master' into
authz-replacedata
b88f77355 [Bowen Liang] update
febcb3ee5 [Bowen Liang] isSparkV34OrGreater
91d41b438 [Bowen Liang] replace data
Lead-authored-by: Bowen Liang <[email protected]>
Co-authored-by: wforget <[email protected]>
Co-authored-by: Bowen Liang <[email protected]>
Co-authored-by: liangbowen <[email protected]>
Signed-off-by: Bowen Liang <[email protected]>
---
.../src/main/resources/table_command_spec.json | 21 +++++++
.../plugin/spark/authz/util/AuthZUtils.scala | 2 +
.../IcebergCatalogPrivilegesBuilderSuite.scala | 36 ++++++++++--
.../spark/authz/V2CommandsPrivilegesSuite.scala | 12 +++-
.../plugin/spark/authz/gen/TableCommands.scala | 12 ++++
.../IcebergCatalogRangerSparkExtensionSuite.scala | 64 ++++++++++++++--------
pom.xml | 4 +-
7 files changed, 120 insertions(+), 31 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index dad13baa1..46f50f706 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -456,6 +456,27 @@
} ],
"opType" : "ALTERTABLE_REPLACECOLS",
"queryDescs" : [ ]
+}, {
+ "classname" : "org.apache.spark.sql.catalyst.plans.logical.ReplaceData",
+ "tableDescs" : [ {
+ "fieldName" : "originalTable",
+ "fieldExtractor" : "DataSourceV2RelationTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE"
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false
+ } ],
+ "opType" : "QUERY",
+ "queryDescs" : [ {
+ "fieldName" : "query",
+ "fieldExtractor" : "LogicalPlanQueryExtractor"
+ } ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.ReplaceTable",
"tableDescs" : [ {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index 4f7cbb9ef..e7ae54a44 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -87,6 +87,8 @@ private[authz] object AuthZUtils {
lazy val isSparkV31OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.1"
lazy val isSparkV32OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.2"
lazy val isSparkV33OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.3"
+ lazy val isSparkV34OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.4"
+ lazy val isSparkV35OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.5"
def quoteIfNeeded(part: String): String = {
if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
index db81cd6e8..d06f2a9d8 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
@@ -63,7 +63,15 @@ class IcebergCatalogPrivilegesBuilderSuite extends
V2CommandsPrivilegesSuite {
val plan = sql(s"DELETE FROM $catalogTable WHERE key = 1
").queryExecution.analyzed
val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan, spark)
assert(operationType === QUERY)
- assert(inputs.isEmpty)
+ if (isSparkV34OrGreater) {
+ assert(inputs.size === 1)
+ val po = inputs.head
+ assertEqualsIgnoreCase(namespace)(po.dbname)
+ assertEqualsIgnoreCase(catalogTableShort)(po.objectName)
+ assertContains(po.columns, "key", "value")
+ } else {
+ assert(inputs.size === 0)
+ }
assert(outputs.size === 1)
val po = outputs.head
assert(po.actionType === PrivilegeObjectActionType.UPDATE)
@@ -80,7 +88,15 @@ class IcebergCatalogPrivilegesBuilderSuite extends
V2CommandsPrivilegesSuite {
val plan = sql(s"UPDATE $catalogTable SET value = 'b' WHERE key = 1
").queryExecution.analyzed
val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan, spark)
assert(operationType === QUERY)
- assert(inputs.isEmpty)
+ if (isSparkV35OrGreater) {
+ assert(inputs.size === 1)
+ val po = inputs.head
+ assertEqualsIgnoreCase(namespace)(po.dbname)
+ assertEqualsIgnoreCase(catalogTableShort)(po.objectName)
+ assertContains(po.columns, "key", "value")
+ } else {
+ assert(inputs.size === 0)
+ }
assert(outputs.size === 1)
val po = outputs.head
assert(po.actionType === PrivilegeObjectActionType.UPDATE)
@@ -104,8 +120,20 @@ class IcebergCatalogPrivilegesBuilderSuite extends
V2CommandsPrivilegesSuite {
s"WHEN NOT MATCHED THEN INSERT *").queryExecution.analyzed
val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan,
spark)
assert(operationType === QUERY)
- assert(inputs.size === 1)
- val po0 = inputs.head
+ if (isSparkV35OrGreater) {
+ assert(inputs.size === 2)
+ val po = inputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assertEqualsIgnoreCase(namespace)(po.dbname)
+ assertEqualsIgnoreCase(table)(po.objectName)
+ assertContains(po.columns, "key", "value")
+ // The properties of RowLevelOperationTable are empty, so owner is none
+ assert(po.owner.isEmpty)
+ } else {
+ assert(inputs.size === 1)
+ }
+ val po0 = inputs.last
assert(po0.actionType === PrivilegeObjectActionType.OTHER)
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assertEqualsIgnoreCase(namespace)(po0.dbname)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
index 3ebea1ce9..149c9ba8f 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
@@ -161,7 +161,11 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
assertEqualsIgnoreCase(namespace)(po.dbname)
assertEqualsIgnoreCase(table)(po.objectName)
assert(po.columns.isEmpty)
- assert(po.owner.isEmpty)
+ if (isSparkV34OrGreater) {
+ checkV2TableOwner(po)
+ } else {
+ assert(po.owner.isEmpty)
+ }
val accessType = AccessType(po, operationType, isInput = false)
assert(accessType === AccessType.CREATE)
}
@@ -193,7 +197,11 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
assertEqualsIgnoreCase(namespace)(po.dbname)
assertEqualsIgnoreCase(table)(po.objectName)
assert(po.columns.isEmpty)
- assert(po.owner.isEmpty)
+ if (isSparkV34OrGreater) {
+ checkV2TableOwner(po)
+ } else {
+ assert(po.owner.isEmpty)
+ }
val accessType = AccessType(po, operationType, isInput = false)
assert(accessType === AccessType.CREATE)
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
index 6a6800210..cf73cfbc6 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
@@ -257,6 +257,17 @@ object TableCommands {
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
}
+ val ReplaceData = {
+ val cmd = "org.apache.spark.sql.catalyst.plans.logical.ReplaceData"
+ val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+ val tableDesc =
+ TableDesc(
+ "originalTable",
+ classOf[DataSourceV2RelationTableExtractor],
+ actionTypeDesc = Some(actionTypeDesc))
+ TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
+ }
+
val UpdateTable = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.UpdateTable"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
@@ -655,6 +666,7 @@ object TableCommands {
RefreshTable,
RefreshTableV2,
RefreshTable3d0,
+ ReplaceData,
ShowColumns,
ShowCreateTable,
ShowCreateTable.copy(classname =
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
index b22a812fd..64834b24d 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
@@ -16,17 +16,18 @@
*/
package org.apache.kyuubi.plugin.spark.authz.ranger
-// scalastyle:off
import scala.util.Try
import org.scalatest.Outcome
+// scalastyle:off
import org.apache.kyuubi.Utils
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.tags.IcebergTest
+import org.apache.kyuubi.util.AssertionUtils._
/**
* Tests for RangerSparkExtensionSuite
@@ -36,14 +37,14 @@ import org.apache.kyuubi.tags.IcebergTest
class IcebergCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
override protected val catalogImpl: String = "hive"
override protected val sqlExtensions: String =
- if (isSparkV31OrGreater)
- "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
- else ""
+ "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
val catalogV2 = "local"
val namespace1 = icebergNamespace
val table1 = "table1"
val outputTable1 = "outputTable1"
+ val bobNamespace = "default_bob"
+ val bobSelectTable = "table_select_bob_1"
override def withFixture(test: NoArgTest): Outcome = {
assume(isSparkV31OrGreater)
@@ -76,6 +77,11 @@ class IcebergCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite
admin,
sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$outputTable1"
+
" (id int, name string, city string) USING iceberg"))
+
+ doAs(
+ admin,
+ sql(s"CREATE TABLE IF NOT EXISTS
$catalogV2.$bobNamespace.$bobSelectTable" +
+ " (id int, name string, city string) USING iceberg"))
}
}
@@ -104,14 +110,19 @@ class IcebergCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite
s" on [$namespace1/$table1/id]"))
withSingleCallEnabled {
- val e2 = intercept[AccessControlException](
- doAs(
- someone,
- sql(mergeIntoSql)))
- assert(e2.getMessage.contains(s"does not have" +
- s" [select] privilege" +
- s" on
[$namespace1/$table1/id,$namespace1/table1/name,$namespace1/$table1/city]," +
- s" [update] privilege on [$namespace1/$outputTable1]"))
+ interceptContains[AccessControlException](doAs(someone,
sql(mergeIntoSql)))(
+ if (isSparkV35OrGreater) {
+ s"does not have [select] privilege on [$namespace1/table1/id" +
+ s",$namespace1/$table1/name,$namespace1/$table1/city]"
+ } else {
+ "does not have " +
+ s"[select] privilege on
[$namespace1/$table1/id,$namespace1/$table1/name,$namespace1/$table1/city]," +
+ s" [update] privilege on [$bobNamespace/$bobSelectTable]"
+ })
+
+ interceptContains[AccessControlException] {
+ doAs(bob, sql(mergeIntoSql))
+ }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
}
doAs(admin, sql(mergeIntoSql))
@@ -119,13 +130,13 @@ class IcebergCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite
test("[KYUUBI #3515] UPDATE TABLE") {
// UpdateTable
- val e1 = intercept[AccessControlException](
- doAs(
- someone,
- sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
- " WHERE id=1")))
- assert(e1.getMessage.contains(s"does not have [update] privilege" +
- s" on [$namespace1/$table1]"))
+ interceptContains[AccessControlException] {
+ doAs(someone, sql(s"UPDATE $catalogV2.$namespace1.$table1 SET
city='Guangzhou' WHERE id=1"))
+ }(if (isSparkV35OrGreater) {
+ s"does not have [select] privilege on [$namespace1/$table1/id]"
+ } else {
+ s"does not have [update] privilege on [$namespace1/$table1]"
+ })
doAs(
admin,
@@ -135,10 +146,17 @@ class IcebergCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite
test("[KYUUBI #3515] DELETE FROM TABLE") {
// DeleteFromTable
- val e6 = intercept[AccessControlException](
- doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE
id=2")))
- assert(e6.getMessage.contains(s"does not have [update] privilege" +
- s" on [$namespace1/$table1]"))
+ interceptContains[AccessControlException] {
+ doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE
id=2"))
+ }(if (isSparkV34OrGreater) {
+ s"does not have [select] privilege on [$namespace1/$table1/id]"
+ } else {
+ s"does not have [update] privilege on [$namespace1/$table1]"
+ })
+
+ interceptContains[AccessControlException] {
+ doAs(bob, sql(s"DELETE FROM $catalogV2.$bobNamespace.$bobSelectTable
WHERE id=2"))
+ }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
doAs(admin, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))
}
diff --git a/pom.xml b/pom.xml
index 97be4c167..7cd2d42f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2239,7 +2239,7 @@
<delta.version>2.4.0</delta.version>
<spark.version>3.4.1</spark.version>
<spark.binary.version>3.4</spark.binary.version>
-
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow,org.apache.kyuubi.tags.IcebergTest</maven.plugin.scalatest.exclude.tags>
+
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow</maven.plugin.scalatest.exclude.tags>
</properties>
</profile>
@@ -2252,7 +2252,7 @@
<delta.version>2.4.0</delta.version>
<spark.version>3.5.0</spark.version>
<spark.binary.version>3.5</spark.binary.version>
-
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PySparkTest</maven.plugin.scalatest.exclude.tags>
+
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.PySparkTest</maven.plugin.scalatest.exclude.tags>
</properties>
</profile>