This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 3f9c6761e [KYUUBI #4916] [AUTHZ] Support ReplaceData and compatible 
Spark 3.4 and 3.5
3f9c6761e is described below

commit 3f9c6761ebb0fd4428ae0fca9b2d0c4ad2bdcb9f
Author: Bowen Liang <[email protected]>
AuthorDate: Tue Oct 17 13:04:15 2023 +0800

    [KYUUBI #4916] [AUTHZ] Support ReplaceData and compatible Spark 3.4 and 3.5
    
    - Iceberg 1.3.0 use Spark's `ReplaceData` since 3.4 as Logical plan for 
"DELETE FROM", instead of Iceberg's `DeleteFromTable` in Spark3.4
    - Requiring select privilege for input table, even it's the same with 
output table
    - compatible Spark 3.4 and 3.5, subquies in the plan
    - enable iceberg test for authz plugin on Spark 3.4 and 3.5
    
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4916 from bowenliang123/authz-replacedata.
    
    Closes #4916
    
    658917752 [Bowen Liang] Merge branch 'master' into authz-replacedata
    17f5d84dd [liangbowen] update
    25009908a [Bowen Liang] ut of user with select only for DELETE FROM
    fa77cea15 [Bowen Liang] ut
    fd9bb8f3a [Bowen Liang] update
    f9cfb98a9 [Bowen Liang] assertion for Spark 3.5
    f574e0da3 [Bowen Liang] assertion for Spark 3.5
    2449c27de [Bowen Liang] bring back single call ut for someone
    78786988d [Bowen Liang] bring back single call ut for someone
    b8e4a6319 [wforget] fix style
    0e26c08b4 [wforget] fix IcebergCatalogRangerSparkExtensionSuite with 
spark-3.5
    02781781f [wforget] enable iceberg tests in spark-3.5
    215e1b861 [wforget] fix TableCommands
    d019b1632 [wforget] followup
    ae17e076b [wforget] Merge remote-tracking branch 'origin/master' into 
authz-replacedata
    b88f77355 [Bowen Liang] update
    febcb3ee5 [Bowen Liang] isSparkV34OrGreater
    91d41b438 [Bowen Liang] replace data
    
    Lead-authored-by: Bowen Liang <[email protected]>
    Co-authored-by: wforget <[email protected]>
    Co-authored-by: Bowen Liang <[email protected]>
    Co-authored-by: liangbowen <[email protected]>
    Signed-off-by: Bowen Liang <[email protected]>
---
 .../src/main/resources/table_command_spec.json     | 21 +++++++
 .../plugin/spark/authz/util/AuthZUtils.scala       |  2 +
 .../IcebergCatalogPrivilegesBuilderSuite.scala     | 36 ++++++++++--
 .../spark/authz/V2CommandsPrivilegesSuite.scala    | 12 +++-
 .../plugin/spark/authz/gen/TableCommands.scala     | 12 ++++
 .../IcebergCatalogRangerSparkExtensionSuite.scala  | 64 ++++++++++++++--------
 pom.xml                                            |  4 +-
 7 files changed, 120 insertions(+), 31 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index dad13baa1..46f50f706 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -456,6 +456,27 @@
   } ],
   "opType" : "ALTERTABLE_REPLACECOLS",
   "queryDescs" : [ ]
+}, {
+  "classname" : "org.apache.spark.sql.catalyst.plans.logical.ReplaceData",
+  "tableDescs" : [ {
+    "fieldName" : "originalTable",
+    "fieldExtractor" : "DataSourceV2RelationTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE"
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false
+  } ],
+  "opType" : "QUERY",
+  "queryDescs" : [ {
+    "fieldName" : "query",
+    "fieldExtractor" : "LogicalPlanQueryExtractor"
+  } ]
 }, {
   "classname" : "org.apache.spark.sql.catalyst.plans.logical.ReplaceTable",
   "tableDescs" : [ {
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index 4f7cbb9ef..e7ae54a44 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -87,6 +87,8 @@ private[authz] object AuthZUtils {
   lazy val isSparkV31OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.1"
   lazy val isSparkV32OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.2"
   lazy val isSparkV33OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.3"
+  lazy val isSparkV34OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.4"
+  lazy val isSparkV35OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.5"
 
   def quoteIfNeeded(part: String): String = {
     if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
index db81cd6e8..d06f2a9d8 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
@@ -63,7 +63,15 @@ class IcebergCatalogPrivilegesBuilderSuite extends 
V2CommandsPrivilegesSuite {
     val plan = sql(s"DELETE FROM $catalogTable WHERE key = 1 
").queryExecution.analyzed
     val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan, spark)
     assert(operationType === QUERY)
-    assert(inputs.isEmpty)
+    if (isSparkV34OrGreater) {
+      assert(inputs.size === 1)
+      val po = inputs.head
+      assertEqualsIgnoreCase(namespace)(po.dbname)
+      assertEqualsIgnoreCase(catalogTableShort)(po.objectName)
+      assertContains(po.columns, "key", "value")
+    } else {
+      assert(inputs.size === 0)
+    }
     assert(outputs.size === 1)
     val po = outputs.head
     assert(po.actionType === PrivilegeObjectActionType.UPDATE)
@@ -80,7 +88,15 @@ class IcebergCatalogPrivilegesBuilderSuite extends 
V2CommandsPrivilegesSuite {
     val plan = sql(s"UPDATE $catalogTable SET value = 'b' WHERE key = 1 
").queryExecution.analyzed
     val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan, spark)
     assert(operationType === QUERY)
-    assert(inputs.isEmpty)
+    if (isSparkV35OrGreater) {
+      assert(inputs.size === 1)
+      val po = inputs.head
+      assertEqualsIgnoreCase(namespace)(po.dbname)
+      assertEqualsIgnoreCase(catalogTableShort)(po.objectName)
+      assertContains(po.columns, "key", "value")
+    } else {
+      assert(inputs.size === 0)
+    }
     assert(outputs.size === 1)
     val po = outputs.head
     assert(po.actionType === PrivilegeObjectActionType.UPDATE)
@@ -104,8 +120,20 @@ class IcebergCatalogPrivilegesBuilderSuite extends 
V2CommandsPrivilegesSuite {
         s"WHEN NOT MATCHED THEN INSERT *").queryExecution.analyzed
       val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan, 
spark)
       assert(operationType === QUERY)
-      assert(inputs.size === 1)
-      val po0 = inputs.head
+      if (isSparkV35OrGreater) {
+        assert(inputs.size === 2)
+        val po = inputs.head
+        assert(po.actionType === PrivilegeObjectActionType.OTHER)
+        assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+        assertEqualsIgnoreCase(namespace)(po.dbname)
+        assertEqualsIgnoreCase(table)(po.objectName)
+        assertContains(po.columns, "key", "value")
+        // The properties of RowLevelOperationTable are empty, so owner is none
+        assert(po.owner.isEmpty)
+      } else {
+        assert(inputs.size === 1)
+      }
+      val po0 = inputs.last
       assert(po0.actionType === PrivilegeObjectActionType.OTHER)
       assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
       assertEqualsIgnoreCase(namespace)(po0.dbname)
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
index 3ebea1ce9..149c9ba8f 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
@@ -161,7 +161,11 @@ abstract class V2CommandsPrivilegesSuite extends 
PrivilegesBuilderSuite {
       assertEqualsIgnoreCase(namespace)(po.dbname)
       assertEqualsIgnoreCase(table)(po.objectName)
       assert(po.columns.isEmpty)
-      assert(po.owner.isEmpty)
+      if (isSparkV34OrGreater) {
+        checkV2TableOwner(po)
+      } else {
+        assert(po.owner.isEmpty)
+      }
       val accessType = AccessType(po, operationType, isInput = false)
       assert(accessType === AccessType.CREATE)
     }
@@ -193,7 +197,11 @@ abstract class V2CommandsPrivilegesSuite extends 
PrivilegesBuilderSuite {
       assertEqualsIgnoreCase(namespace)(po.dbname)
       assertEqualsIgnoreCase(table)(po.objectName)
       assert(po.columns.isEmpty)
-      assert(po.owner.isEmpty)
+      if (isSparkV34OrGreater) {
+        checkV2TableOwner(po)
+      } else {
+        assert(po.owner.isEmpty)
+      }
       val accessType = AccessType(po, operationType, isInput = false)
       assert(accessType === AccessType.CREATE)
     }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
index 6a6800210..cf73cfbc6 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
@@ -257,6 +257,17 @@ object TableCommands {
     TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
   }
 
+  val ReplaceData = {
+    val cmd = "org.apache.spark.sql.catalyst.plans.logical.ReplaceData"
+    val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+    val tableDesc =
+      TableDesc(
+        "originalTable",
+        classOf[DataSourceV2RelationTableExtractor],
+        actionTypeDesc = Some(actionTypeDesc))
+    TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
+  }
+
   val UpdateTable = {
     val cmd = "org.apache.spark.sql.catalyst.plans.logical.UpdateTable"
     val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
@@ -655,6 +666,7 @@ object TableCommands {
     RefreshTable,
     RefreshTableV2,
     RefreshTable3d0,
+    ReplaceData,
     ShowColumns,
     ShowCreateTable,
     ShowCreateTable.copy(classname =
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
index b22a812fd..64834b24d 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
@@ -16,17 +16,18 @@
  */
 package org.apache.kyuubi.plugin.spark.authz.ranger
 
-// scalastyle:off
 import scala.util.Try
 
 import org.scalatest.Outcome
 
+// scalastyle:off
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.plugin.spark.authz.AccessControlException
 import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
 import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
 import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
 import org.apache.kyuubi.tags.IcebergTest
+import org.apache.kyuubi.util.AssertionUtils._
 
 /**
  * Tests for RangerSparkExtensionSuite
@@ -36,14 +37,14 @@ import org.apache.kyuubi.tags.IcebergTest
 class IcebergCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
   override protected val catalogImpl: String = "hive"
   override protected val sqlExtensions: String =
-    if (isSparkV31OrGreater)
-      "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
-    else ""
+    "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
 
   val catalogV2 = "local"
   val namespace1 = icebergNamespace
   val table1 = "table1"
   val outputTable1 = "outputTable1"
+  val bobNamespace = "default_bob"
+  val bobSelectTable = "table_select_bob_1"
 
   override def withFixture(test: NoArgTest): Outcome = {
     assume(isSparkV31OrGreater)
@@ -76,6 +77,11 @@ class IcebergCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite
         admin,
         sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$outputTable1" 
+
           " (id int, name string, city string) USING iceberg"))
+
+      doAs(
+        admin,
+        sql(s"CREATE TABLE IF NOT EXISTS 
$catalogV2.$bobNamespace.$bobSelectTable" +
+          " (id int, name string, city string) USING iceberg"))
     }
   }
 
@@ -104,14 +110,19 @@ class IcebergCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite
       s" on [$namespace1/$table1/id]"))
 
     withSingleCallEnabled {
-      val e2 = intercept[AccessControlException](
-        doAs(
-          someone,
-          sql(mergeIntoSql)))
-      assert(e2.getMessage.contains(s"does not have" +
-        s" [select] privilege" +
-        s" on 
[$namespace1/$table1/id,$namespace1/table1/name,$namespace1/$table1/city]," +
-        s" [update] privilege on [$namespace1/$outputTable1]"))
+      interceptContains[AccessControlException](doAs(someone, 
sql(mergeIntoSql)))(
+        if (isSparkV35OrGreater) {
+          s"does not have [select] privilege on [$namespace1/table1/id" +
+            s",$namespace1/$table1/name,$namespace1/$table1/city]"
+        } else {
+          "does not have " +
+            s"[select] privilege on 
[$namespace1/$table1/id,$namespace1/$table1/name,$namespace1/$table1/city]," +
+            s" [update] privilege on [$bobNamespace/$bobSelectTable]"
+        })
+
+      interceptContains[AccessControlException] {
+        doAs(bob, sql(mergeIntoSql))
+      }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
     }
 
     doAs(admin, sql(mergeIntoSql))
@@ -119,13 +130,13 @@ class IcebergCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite
 
   test("[KYUUBI #3515] UPDATE TABLE") {
     // UpdateTable
-    val e1 = intercept[AccessControlException](
-      doAs(
-        someone,
-        sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
-          " WHERE id=1")))
-    assert(e1.getMessage.contains(s"does not have [update] privilege" +
-      s" on [$namespace1/$table1]"))
+    interceptContains[AccessControlException] {
+      doAs(someone, sql(s"UPDATE $catalogV2.$namespace1.$table1 SET 
city='Guangzhou'  WHERE id=1"))
+    }(if (isSparkV35OrGreater) {
+      s"does not have [select] privilege on [$namespace1/$table1/id]"
+    } else {
+      s"does not have [update] privilege on [$namespace1/$table1]"
+    })
 
     doAs(
       admin,
@@ -135,10 +146,17 @@ class IcebergCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite
 
   test("[KYUUBI #3515] DELETE FROM TABLE") {
     // DeleteFromTable
-    val e6 = intercept[AccessControlException](
-      doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE 
id=2")))
-    assert(e6.getMessage.contains(s"does not have [update] privilege" +
-      s" on [$namespace1/$table1]"))
+    interceptContains[AccessControlException] {
+      doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE 
id=2"))
+    }(if (isSparkV34OrGreater) {
+      s"does not have [select] privilege on [$namespace1/$table1/id]"
+    } else {
+      s"does not have [update] privilege on [$namespace1/$table1]"
+    })
+
+    interceptContains[AccessControlException] {
+      doAs(bob, sql(s"DELETE FROM $catalogV2.$bobNamespace.$bobSelectTable 
WHERE id=2"))
+    }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
 
     doAs(admin, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))
   }
diff --git a/pom.xml b/pom.xml
index 97be4c167..7cd2d42f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2239,7 +2239,7 @@
                 <delta.version>2.4.0</delta.version>
                 <spark.version>3.4.1</spark.version>
                 <spark.binary.version>3.4</spark.binary.version>
-                
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow,org.apache.kyuubi.tags.IcebergTest</maven.plugin.scalatest.exclude.tags>
+                
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow</maven.plugin.scalatest.exclude.tags>
             </properties>
         </profile>
 
@@ -2252,7 +2252,7 @@
                 <delta.version>2.4.0</delta.version>
                 <spark.version>3.5.0</spark.version>
                 <spark.binary.version>3.5</spark.binary.version>
-                
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PySparkTest</maven.plugin.scalatest.exclude.tags>
+                
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.PySparkTest</maven.plugin.scalatest.exclude.tags>
             </properties>
         </profile>
 

Reply via email to