This is an automated email from the ASF dual-hosted git repository.
diqiu50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 2f8c3294da [#10181] feat(spark-connector): Support TableWritePrivilege
for Spark 3.5+ authorization (#10194)
2f8c3294da is described below
commit 2f8c3294da10c31b73f5058e80d3d5f85a376113
Author: roryqi <[email protected]>
AuthorDate: Thu Mar 5 15:37:36 2026 +0800
[#10181] feat(spark-connector): Support TableWritePrivilege for Spark 3.5+
authorization (#10194)
### What changes were proposed in this pull request?
This PR adds support for Spark 3.5's TableWritePrivilege interface to
enforce MODIFY_TABLE privilege when performing write operations (INSERT,
UPDATE, DELETE). Changes:
- Add loadTable(Identifier, Set<TableWritePrivilege>) method to Spark
3.5+ catalogs (Hive, Iceberg, JDBC, Paimon)
- Add loadTableForWriting and loadGravitinoTableForWriting helper
methods in BaseCatalog
- Upgrade Spark 3.5 version from 3.5.1 to 3.5.3
- Add integration test to verify authorization behavior In Spark 3.5+,
when a user without MODIFY_TABLE privilege attempts to INSERT, a
ForbiddenException will be thrown. In Spark 3.3/3.4, the same operation
succeeds because those versions don't support TableWritePrivilege.
### Why are the changes needed?
Fix: #10181
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added tests.
---------
Co-authored-by: Copilot <[email protected]>
---
gradle/libs.versions.toml | 2 +-
.../spark/connector/catalog/BaseCatalog.java | 31 +++++++++
.../test/authorization/SparkAuthorizationIT.java | 73 ++++++++++++++++++++++
.../integration/test/hive/SparkHiveCatalogIT.java | 5 +-
.../hive/GravitinoHiveCatalogSpark35.java | 14 ++++-
.../iceberg/GravitinoIcebergCatalogSpark35.java | 14 ++++-
.../jdbc/GravitinoJdbcCatalogSpark35.java | 11 ++++
.../paimon/GravitinoPaimonCatalogSpark35.java | 14 ++++-
.../test/authorization/SparkAuthorizationIT35.java | 23 ++++++-
9 files changed, 180 insertions(+), 7 deletions(-)
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 8d1495ba00..6e8010a77c 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -65,7 +65,7 @@ iceberg4connector = "1.6.1" # used for compile connectors
like Spark, Flink, etc
paimon = '1.2.0'
spark33 = "3.3.4"
spark34 = "3.4.3"
-spark35 = "3.5.1"
+spark35 = "3.5.3"
kyuubi4spark = "1.11.0"
kyuubi4authz = "1.10.2"
trino = '435'
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
index fc02032593..24a75bd4a9 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
@@ -20,6 +20,7 @@
package org.apache.gravitino.spark.connector.catalog;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -30,6 +31,7 @@ import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.authorization.Privilege;
import org.apache.gravitino.exceptions.ForbiddenException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
@@ -431,6 +433,20 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces, F
}
}
+ protected org.apache.gravitino.rel.Table
loadGravitinoTableForWriting(Identifier ident)
+ throws NoSuchTableException {
+ try {
+ String database = getDatabase(ident);
+ return gravitinoCatalogClient
+ .asTableCatalog()
+ .loadTable(
+ NameIdentifier.of(database, ident.name()),
+ Sets.newHashSet(Privilege.Name.MODIFY_TABLE));
+ } catch (org.apache.gravitino.exceptions.NoSuchTableException e) {
+ throw new NoSuchTableException(ident);
+ }
+ }
+
protected String getDatabase(Identifier sparkIdentifier) {
return getDatabase(sparkIdentifier.namespace());
}
@@ -498,6 +514,21 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces, F
throw new NoSuchFunctionException(ident);
}
+ protected Table loadTableForWriting(Identifier ident)
+ throws NoSuchTableException, ForbiddenException {
+ org.apache.gravitino.rel.Table gravitinoTable =
loadGravitinoTableForWriting(ident);
+ org.apache.spark.sql.connector.catalog.Table sparkTable =
loadSparkTable(ident);
+ // Will create a catalog specific table
+ return createSparkTable(
+ ident,
+ gravitinoTable,
+ sparkTable,
+ sparkCatalog,
+ propertiesConverter,
+ sparkTransformConverter,
+ sparkTypeConverter);
+ }
+
private void validateNamespace(String[] namespace) {
Preconditions.checkArgument(
namespace.length == 1,
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkAuthorizationIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkAuthorizationIT.java
index e91fc4b77f..9f61a1eff5 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkAuthorizationIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkAuthorizationIT.java
@@ -92,6 +92,10 @@ public abstract class SparkAuthorizationIT extends BaseIT {
protected final String TIME_ZONE_UTC = "UTC";
+ protected SparkSession getSparkSession() {
+ return normalUserSparkSession;
+ }
+
@BeforeAll
@Override
public void startIntegrationTest() throws Exception {
@@ -313,6 +317,75 @@ public abstract class SparkAuthorizationIT extends BaseIT {
}
}
+ @Test
+ @Order(6)
+ public void testLoadTableForWriting() {
+ GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+ String testTable = "test_write_table";
+ String testRole = "role_no_modify";
+
+ // Temporarily revoke ROLE from user
+ gravitinoMetalake.revokeRolesFromUser(ImmutableList.of(ROLE), NORMAL_USER);
+
+ // Create a role with SELECT_TABLE but without MODIFY_TABLE privilege
+ SecurableObject catalogObject =
+ SecurableObjects.ofCatalog(
+ JDBC_CATALOG,
+ ImmutableList.of(
+ Privileges.UseCatalog.allow(),
+ Privileges.UseSchema.allow(),
+ Privileges.SelectTable.allow(),
+ Privileges.CreateTable.allow()));
+ gravitinoMetalake.createRole(testRole, new HashMap<>(),
ImmutableList.of(catalogObject));
+ gravitinoMetalake.grantRolesToUser(ImmutableList.of(testRole),
NORMAL_USER);
+
+ TableCatalog tableCatalog =
gravitinoMetalake.loadCatalog(JDBC_CATALOG).asTableCatalog();
+ try {
+ // Create a test table first
+ normalUserSparkSession.sql("use " + JDBC_CATALOG);
+ normalUserSparkSession.sql("use " + JDBC_DATABASE);
+ tableCatalog.createTable(
+ NameIdentifier.of(JDBC_DATABASE, testTable),
+ new Column[] {
+ Column.of("id", Types.StringType.get()), Column.of("name",
Types.StringType.get())
+ },
+ "",
+ new HashMap<>());
+
+ // Deny MODIFY_TABLE privilege explicitly
+ gravitinoMetalake.grantPrivilegesToRole(
+ testRole,
+ MetadataObjects.of(
+ ImmutableList.of(JDBC_CATALOG, JDBC_DATABASE, testTable),
MetadataObject.Type.TABLE),
+ ImmutableList.of(Privileges.ModifyTable.deny()));
+
+ // Assert INSERT behavior - Spark 3.5+ should throw ForbiddenException,
+ // lower versions won't because they don't support TableWritePrivilege
+ assertInsertBehaviorWithoutModifyPrivilege(testTable);
+ } finally {
+ // Clean up
+ try {
+ tableCatalog.dropTable(NameIdentifier.of(JDBC_DATABASE, testTable));
+ } catch (Exception e) {
+ Assertions.fail("Failed to drop test table " + testTable, e);
+ }
+ gravitinoMetalake.revokeRolesFromUser(ImmutableList.of(testRole),
NORMAL_USER);
+ gravitinoMetalake.deleteRole(testRole);
+ gravitinoMetalake.grantRolesToUser(ImmutableList.of(ROLE), NORMAL_USER);
+ }
+ }
+
+ /**
+ * Assert the behavior when user tries to INSERT without MODIFY_TABLE
privilege. Spark 3.3/3.4
+ * don't support TableWritePrivilege, so INSERT will succeed. Spark 3.5+
supports it, so INSERT
+ * should throw ForbiddenException. Subclasses can override this method to
assert the expected
+ * behavior.
+ */
+ protected void assertInsertBehaviorWithoutModifyPrivilege(String tableName) {
+ // Spark 3.3/3.4 don't support TableWritePrivilege, so INSERT will succeed
+ normalUserSparkSession.sql(String.format("INSERT INTO %s VALUES (1,
'test')", tableName));
+ }
+
private void assertEqualsRows(List<Row> exceptRows, List<Row> actualRows) {
Assertions.assertEquals(exceptRows.size(), actualRows.size());
for (int i = 0; i < exceptRows.size(); i++) {
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
index 1beacc3853..b0cd8716cb 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
@@ -337,7 +337,8 @@ public abstract class SparkHiveCatalogIT extends
SparkCommonIT {
@Test
void testHiveFormatWithLocationTable() {
String tableName = "test_hive_format_with_location_table";
- String location = "/user/hive/external_db";
+
+ String location = hdfs.getUri() + "/user/hive/external_db";
Boolean[] isExternals = {Boolean.TRUE, Boolean.FALSE};
Arrays.stream(isExternals)
@@ -351,7 +352,7 @@ public abstract class SparkHiveCatalogIT extends
SparkCommonIT {
SparkTableInfo tableInfo = getTableInfo(tableName);
checkTableReadWrite(tableInfo);
-
Assertions.assertTrue(tableInfo.getTableLocation().equals(hdfs.getUri() +
location));
+ Assertions.assertEquals(location, tableInfo.getTableLocation());
});
}
diff --git
a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark35.java
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark35.java
index 2bc04ab808..9331c268f9 100644
---
a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark35.java
+++
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark35.java
@@ -18,4 +18,16 @@
*/
package org.apache.gravitino.spark.connector.hive;
-public class GravitinoHiveCatalogSpark35 extends GravitinoHiveCatalogSpark34 {}
+import java.util.Set;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableWritePrivilege;
+
+public class GravitinoHiveCatalogSpark35 extends GravitinoHiveCatalogSpark34 {
+ @Override
+ public Table loadTable(Identifier ident, Set<TableWritePrivilege>
writePrivileges)
+ throws NoSuchTableException {
+ return loadTableForWriting(ident);
+ }
+}
diff --git
a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark35.java
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark35.java
index 3ae8ded1a6..53484a9bb5 100644
---
a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark35.java
+++
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark35.java
@@ -18,4 +18,16 @@
*/
package org.apache.gravitino.spark.connector.iceberg;
-public class GravitinoIcebergCatalogSpark35 extends
GravitinoIcebergCatalogSpark34 {}
+import java.util.Set;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableWritePrivilege;
+
+public class GravitinoIcebergCatalogSpark35 extends
GravitinoIcebergCatalogSpark34 {
+ @Override
+ public Table loadTable(Identifier ident, Set<TableWritePrivilege>
writePrivileges)
+ throws NoSuchTableException {
+ return loadTableForWriting(ident);
+ }
+}
diff --git
a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java
index 1b10d63fa0..9ce36c2be1 100644
---
a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java
+++
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java
@@ -19,10 +19,15 @@
package org.apache.gravitino.spark.connector.jdbc;
+import java.util.Set;
import org.apache.gravitino.spark.connector.SparkTableChangeConverter;
import org.apache.gravitino.spark.connector.SparkTableChangeConverter34;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
import org.apache.gravitino.spark.connector.SparkTypeConverter34;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableWritePrivilege;
public class GravitinoJdbcCatalogSpark35 extends GravitinoJdbcCatalog {
@@ -36,4 +41,10 @@ public class GravitinoJdbcCatalogSpark35 extends
GravitinoJdbcCatalog {
SparkTypeConverter sparkTypeConverter) {
return new SparkTableChangeConverter34(sparkTypeConverter);
}
+
+ @Override
+ public Table loadTable(Identifier ident, Set<TableWritePrivilege>
writePrivileges)
+ throws NoSuchTableException {
+ return loadTableForWriting(ident);
+ }
}
diff --git
a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java
index 2c39af5b2f..2bd5daaa12 100644
---
a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java
+++
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java
@@ -18,4 +18,16 @@
*/
package org.apache.gravitino.spark.connector.paimon;
-public class GravitinoPaimonCatalogSpark35 extends
GravitinoPaimonCatalogSpark34 {}
+import java.util.Set;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableWritePrivilege;
+
+public class GravitinoPaimonCatalogSpark35 extends
GravitinoPaimonCatalogSpark34 {
+ @Override
+ public Table loadTable(Identifier ident, Set<TableWritePrivilege>
writePrivileges)
+ throws NoSuchTableException {
+ return loadTableForWriting(ident);
+ }
+}
diff --git
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkAuthorizationIT35.java
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkAuthorizationIT35.java
index 097078142f..01efb02e0e 100644
---
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkAuthorizationIT35.java
+++
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/authorization/SparkAuthorizationIT35.java
@@ -17,4 +17,25 @@
package org.apache.gravitino.spark.connector.integration.test.authorization;
-public class SparkAuthorizationIT35 extends SparkAuthorizationIT {}
+import static org.junit.Assert.assertThrows;
+
+import org.apache.gravitino.exceptions.ForbiddenException;
+
+/**
+ * Spark 3.5+ specific authorization integration tests. Overrides assertion
methods for
+ * functionality that behaves differently in Spark 3.5+, such as the loadTable
method with
+ * TableWritePrivilege support.
+ */
+public class SparkAuthorizationIT35 extends SparkAuthorizationIT {
+
+ /**
+ * In Spark 3.5+, INSERT should throw ForbiddenException when user doesn't
have MODIFY_TABLE
+ * privilege because Spark 3.5+ supports {@code loadTable(Identifier,
Set<TableWritePrivilege>)}.
+ */
+ @Override
+ protected void assertInsertBehaviorWithoutModifyPrivilege(String tableName) {
+ assertThrows(
+ ForbiddenException.class,
+ () -> getSparkSession().sql(String.format("INSERT INTO %s VALUES (1,
'test')", tableName)));
+ }
+}