This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bc26bb927 [core] Introduce privilege system for catalog based on
FileSystem (#2789)
bc26bb927 is described below
commit bc26bb92775a386add1cd9b202085f9aff66f320
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 29 20:35:09 2024 +0800
[core] Introduce privilege system for catalog based on FileSystem (#2789)
---
docs/content/maintenance/manage-privileges.md | 246 +++++++++++
.../java/org/apache/paimon/AbstractFileStore.java | 4 +-
.../org/apache/paimon/catalog/AbstractCatalog.java | 25 +-
.../java/org/apache/paimon/catalog/Catalog.java | 9 +
.../paimon/catalog/FileSystemCatalogFactory.java | 18 +-
.../privilege/AllGrantedPrivilegeChecker.java | 66 +++
.../org/apache/paimon/privilege/EntityType.java | 35 ++
.../privilege/FileBasedPrivilegeManager.java | 457 +++++++++++++++++++++
.../paimon/privilege/NoPrivilegeException.java | 39 ++
.../apache/paimon/privilege/PrivilegeChecker.java | 49 +++
.../paimon/privilege/PrivilegeCheckerImpl.java | 154 +++++++
.../apache/paimon/privilege/PrivilegeManager.java | 89 ++++
.../org/apache/paimon/privilege/PrivilegeType.java | 65 +++
.../apache/paimon/privilege/PrivilegedCatalog.java | 256 ++++++++++++
.../paimon/privilege/PrivilegedFileStore.java | 200 +++++++++
.../paimon/privilege/PrivilegedFileStoreTable.java | 308 ++++++++++++++
.../privilege/FileBasedPrivilegeManagerTest.java | 170 ++++++++
.../org/apache/paimon/stats/StatsTableTest.java | 4 +-
.../paimon/table/FileStoreTableTestBase.java | 4 +-
.../paimon/flink/procedure/CompactProcedure.java | 5 +-
.../flink/procedure/CompactDatabaseProcedure.java | 5 +-
.../paimon/flink/procedure/CompactProcedure.java | 5 +-
.../paimon/flink/procedure/MergeIntoProcedure.java | 5 +-
.../privilege/CreatePrivilegedUserProcedure.java | 33 +-
.../privilege/DropPrivilegedUserProcedure.java | 33 +-
.../privilege/GrantPrivilegeToUserProcedure.java | 79 ++++
.../privilege/InitFileBasedPrivilegeProcedure.java | 74 ++++
.../privilege/PrivilegeProcedureBase.java | 35 ++
.../RevokePrivilegeFromUserProcedure.java | 87 ++++
.../services/org.apache.paimon.factories.Factory | 7 +-
.../privilege/PrivilegeProcedureITCase.java | 323 +++++++++++++++
.../java/org/apache/paimon/hive/HiveCatalog.java | 29 +-
.../org/apache/paimon/hive/HiveCatalogFactory.java | 5 -
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 73 +++-
34 files changed, 2903 insertions(+), 93 deletions(-)
diff --git a/docs/content/maintenance/manage-privileges.md
b/docs/content/maintenance/manage-privileges.md
new file mode 100644
index 000000000..bf07e14b7
--- /dev/null
+++ b/docs/content/maintenance/manage-privileges.md
@@ -0,0 +1,246 @@
+---
+title: "Manage Privileges"
+weight: 10
+type: docs
+aliases:
+- /maintenance/manage-privileges.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Manage Privileges
+
+Paimon provides a privilege system on catalogs.
+Privileges determine which users can perform which operations on which objects,
+so that you can manage table access in a fine-grained manner.
+
+Currently, Paimon adopts the identity-based access control (IBAC) privilege
model.
+That is, privileges are directly assigned to users.
+
+{{< hint warning >}}
+This privilege system only prevents unwanted users from accessing tables
through catalogs.
+It does not block access through temporary table (by specifying table path on
filesystem),
+nor does it prevent user from directly modifying data files on filesystem.
+If you need more serious protection, use a filesystem with access management
instead.
+{{< /hint >}}
+
+## Basic Concepts
+
+We now introduce the basic concepts of the privilege system.
+
+### Object
+
+An object is an entity to which access can be granted. Unless allowed by a
grant, access is denied.
+
+Currently, the privilege system in Paimon has three types of objects: CATALOG,
DATABASE and TABLE.
+Objects have a logical hierarchy, which is related to the concept they
represent.
+For example:
+* If a user is granted a privilege on the catalog,
+he will also have this privilege on all databases and all tables in the
catalog.
+* If a user is granted a privilege on the database,
+he will also have this privilege on all tables in that database.
+* If a user is revoked a privilege from the catalog,
+he will also lose this privilege on all databases and all tables in the
catalog.
+* If a user is revoked a privilege from the database,
+he will also lose this privilege on all tables in that database.
+
+### Privilege
+
+A privilege is a defined level of access to an object.
+Multiple privileges can be used to control the granularity of access granted
on an object.
+Privileges are object-specific. Different objects may have different
privileges.
+
+Currently, we support the following privileges.
+
+| Privilege | Description
| Can be Granted on |
+|-------------|-------------------------------------------------------------------------------------------|--------------------------|
+| SELECT | Queries data in a table.
| TABLE, DATABASE, CATALOG |
+| INSERT | Inserts, updates or drops data in a table. Creates or drops
tags and branches in a table. | TABLE, DATABASE, CATALOG |
+| ALTER_TABLE | Alters metadata of a table, including table name, column
names, table options, etc. | TABLE, DATABASE, CATALOG |
+| DROP_TABLE | Drops a table. | TABLE, DATABASE, CATALOG |
+| CREATE_TABLE | Creates a table in a database. | DATABASE, CATALOG |
+| DROP_DATABASE | Drops a database. | DATABASE, CATALOG |
+| CREATE_DATABASE | Creates a database in the catalog. | CATALOG |
+| ADMIN | Creates or drops privileged users, grants or revokes privileges from
users in a catalog. | CATALOG |
+
+### User
+
+The entity to which privileges can be granted. Users are authenticated by
their password.
+
+When the privilege system is enabled, two special users will be created
automatically.
+
+* The `root` user,
+which is identified by the provided root password when enabling the privilege
system.
+This user always has all privileges in the catalog.
+* The `anonymous` user.
+This is the default user if no username and password is provided when creating
the catalog.
+
+## Enable Privileges
+
+Paimon currently only supports file-based privilege system.
+Only catalogs with `'metastore' = 'filesystem'` (the default value) or
`'metastore' = 'hive'` support such privilege system.
+
+To enable the privilege system on a filesystem / Hive catalog, do the
following steps.
+
+{{< tabs "enable-privileges" >}}
+
+{{< tab "Flink 1.18+" >}}
+Run the following Flink SQL.
+```sql
+-- use the catalog where you want to enable the privilege system
+USE CATALOG `my-catalog`;
+
+-- initialize privilege system by providing a root password
+-- change 'root-password' to the password you want
+CALL sys.init_file_based_privilege('root-password');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+After the privilege system is enabled,
+please re-create the catalog and authenticate as `root` to create other users
and grant them privileges.
+
+{{< hint info >}}
+Privilege system does not affect existing catalogs.
+That is, these catalogs can still access and modify the tables freely.
+Please drop and re-create all catalogs with the desired warehouse path
+if you want to use the privilege system in these catalogs.
+{{< /hint >}}
+
+## Accessing Privileged Catalogs
+
+To access a privileged catalog and to be authenticated as a user,
+you need to define `user` and `password` catalog options when creating the
catalog.
+For example, the following SQL creates a catalog while trying to be
authenticated as `root`,
+whose password is `mypassword`.
+
+{{< tabs "access-catalog" >}}
+
+{{< tab "Flink" >}}
+```sql
+CREATE CATALOG `my-catalog` WITH (
+ 'type' = 'paimon',
+ -- ...
+ 'user' = 'root',
+ 'password' = 'mypassword'
+);
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Creating Users
+
+You must be authenticated as a user with `ADMIN` privilege (for example,
`root`) to perform this operation.
+
+Do the following steps to create a user in the privilege system.
+
+{{< tabs "create-users" >}}
+
+{{< tab "Flink 1.18+" >}}
+Run the following Flink SQL.
+```sql
+-- use the catalog where you want to create a user
+-- you must be authenticated as a user with ADMIN privilege in this catalog
+USE CATALOG `my-catalog`;
+
+-- create a user authenticated by the specified password
+-- change 'user' and 'password' to the username and password you want
+CALL sys.create_privileged_user('user', 'password');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Dropping Users
+
+You must be authenticated as a user with `ADMIN` privilege (for example,
`root`) to perform this operation.
+
+Do the following steps to drop a user in the privilege system.
+
+{{< tabs "drop-users" >}}
+
+{{< tab "Flink 1.18+" >}}
+Run the following Flink SQL.
+```sql
+-- use the catalog where you want to drop a user
+-- you must be authenticated as a user with ADMIN privilege in this catalog
+USE CATALOG `my-catalog`;
+
+-- change 'user' to the username you want to drop
+CALL sys.drop_privileged_user('user');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Granting Privileges to Users
+
+You must be authenticated as a user with `ADMIN` privilege (for example,
`root`) to perform this operation.
+
+Do the following steps to grant a user with privilege in the privilege system.
+
+{{< tabs "grant-to-users" >}}
+
+{{< tab "Flink 1.18+" >}}
+Run the following Flink SQL.
+```sql
+-- use the catalog where you want to drop a user
+-- you must be authenticated as a user with ADMIN privilege in this catalog
+USE CATALOG `my-catalog`;
+
+-- you can change 'user' to the username you want, and 'SELECT' to other
privilege you want
+-- grant 'user' with privilege 'SELECT' on the whole catalog
+CALL sys.grant_privilege_to_user('user', 'SELECT');
+-- grant 'user' with privilege 'SELECT' on database my_db
+CALL sys.grant_privilege_to_user('user', 'SELECT', 'my_db');
+-- grant 'user' with privilege 'SELECT' on table my_db.my_tbl
+CALL sys.grant_privilege_to_user('user', 'SELECT', 'my_db', 'my_tbl');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Revoking Privileges to Users
+
+You must be authenticated as a user with `ADMIN` privilege (for example,
`root`) to perform this operation.
+
+Do the following steps to revoke a privilege from user in the privilege system.
+
+{{< tabs "revoke-from-users" >}}
+
+{{< tab "Flink 1.18+" >}}
+Run the following Flink SQL.
+```sql
+-- use the catalog where you want to drop a user
+-- you must be authenticated as a user with ADMIN privilege in this catalog
+USE CATALOG `my-catalog`;
+
+-- you can change 'user' to the username you want, and 'SELECT' to other
privilege you want
+-- revoke 'user' with privilege 'SELECT' on the whole catalog
+CALL sys.revoke_privilege_from_user('user', 'SELECT');
+-- revoke 'user' with privilege 'SELECT' on database my_db
+CALL sys.revoke_privilege_from_user('user', 'SELECT', 'my_db');
+-- revoke 'user' with privilege 'SELECT' on table my_db.my_tbl
+CALL sys.revoke_privilege_from_user('user', 'SELECT', 'my_db', 'my_tbl');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 07a73d3de..e893c0525 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -62,7 +62,7 @@ import static
org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
*
* @param <T> type of record to read and write.
*/
-public abstract class AbstractFileStore<T> implements FileStore<T> {
+abstract class AbstractFileStore<T> implements FileStore<T> {
protected final FileIO fileIO;
protected final SchemaManager schemaManager;
@@ -73,7 +73,7 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
@Nullable private final SegmentsCache<String> writeManifestCache;
- public AbstractFileStore(
+ protected AbstractFileStore(
FileIO fileIO,
SchemaManager schemaManager,
TableSchema schema,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 4a4fb04fd..7d26a2197 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -85,6 +85,16 @@ public abstract class AbstractCatalog implements Catalog {
this.catalogOptions = options;
}
+ @Override
+ public Map<String, String> options() {
+ return catalogOptions.toMap();
+ }
+
+ @Override
+ public FileIO fileIO() {
+ return fileIO;
+ }
+
@Override
public Optional<CatalogLockFactory> lockFactory() {
if (!lockEnabled()) {
@@ -369,17 +379,6 @@ public abstract class AbstractCatalog implements Catalog {
}
}
- /**
- * Get the warehouse path for the catalog if exists.
- *
- * @return The catalog warehouse path.
- */
- public abstract String warehouse();
-
- public Map<String, String> options() {
- return catalogOptions.toMap();
- }
-
protected abstract TableSchema getDataTableSchema(Identifier identifier)
throws TableNotExistException;
@@ -409,10 +408,6 @@ public abstract class AbstractCatalog implements Catalog {
tableDefaultOptions.forEach(options::putIfAbsent);
}
- public FileIO fileIO() {
- return fileIO;
- }
-
private String[] tableAndSystemName(Identifier identifier) {
String[] splits = StringUtils.split(identifier.getObjectName(),
SYSTEM_TABLE_SPLITTER);
if (splits.length != 2) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 99b71e8de..4f6341417 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -19,6 +19,7 @@
package org.apache.paimon.catalog;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.fs.FileIO;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -45,6 +46,14 @@ public interface Catalog extends AutoCloseable {
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
+ /** Warehouse root path containing all database directories in this
catalog. */
+ String warehouse();
+
+ /** Catalog options. */
+ Map<String, String> options();
+
+ FileIO fileIO();
+
/**
* Get lock factory from catalog. Lock is used to support multiple
concurrent writes on the
* object store.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
index 8f4b6eede..8a0b1643f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
@@ -20,6 +20,9 @@ package org.apache.paimon.catalog;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.privilege.FileBasedPrivilegeManager;
+import org.apache.paimon.privilege.PrivilegeManager;
+import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.table.TableType;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
@@ -40,6 +43,19 @@ public class FileSystemCatalogFactory implements
CatalogFactory {
throw new IllegalArgumentException(
"Only managed table is supported in File system catalog.");
}
- return new FileSystemCatalog(fileIO, warehouse, context.options());
+
+ Catalog catalog = new FileSystemCatalog(fileIO, warehouse,
context.options());
+
+ PrivilegeManager privilegeManager =
+ new FileBasedPrivilegeManager(
+ warehouse.toString(),
+ fileIO,
+ context.options().get(PrivilegedCatalog.USER),
+ context.options().get(PrivilegedCatalog.PASSWORD));
+ if (privilegeManager.privilegeEnabled()) {
+ catalog = new PrivilegedCatalog(catalog, privilegeManager);
+ }
+
+ return catalog;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/AllGrantedPrivilegeChecker.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/AllGrantedPrivilegeChecker.java
new file mode 100644
index 000000000..09944681a
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/AllGrantedPrivilegeChecker.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+
+/** Allows current user to perform all operations. */
+public class AllGrantedPrivilegeChecker implements PrivilegeChecker {
+
+ @Override
+ public void assertCanSelect(Identifier identifier) {}
+
+ @Override
+ public void assertCanInsert(Identifier identifier) {}
+
+ @Override
+ public void assertCanAlterTable(Identifier identifier) {}
+
+ @Override
+ public void assertCanDropTable(Identifier identifier) {}
+
+ @Override
+ public void assertCanCreateTable(String databaseName) {}
+
+ @Override
+ public void assertCanDropDatabase(String databaseName) {}
+
+ @Override
+ public void assertCanCreateDatabase() {}
+
+ @Override
+ public void assertCanCreateUser() {}
+
+ @Override
+ public void assertCanDropUser() {}
+
+ @Override
+ public void assertCanGrant(String identifier, PrivilegeType privilege) {}
+
+ @Override
+ public void assertCanRevoke() {}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ return o != null && getClass() == o.getClass();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/EntityType.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/EntityType.java
new file mode 100644
index 000000000..134de1715
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/privilege/EntityType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.annotation.Public;
+
+/**
+ * Entity type in privilege system.
+ *
+ * <p>Supports identity-based access control (grant privilege directly to
user) and role-based
+ * access control (grant privilege to role, then assign role to user).
+ *
+ * @since 0.8.0
+ */
+@Public
+public enum EntityType {
+ USER,
+ ROLE
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
new file mode 100644
index 000000000..ec86e0cd1
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A {@link PrivilegeManager} based on user and privilege system tables. The
directories of these
+ * system tables are created at the root of warehouse (table-root/user.sys and
+ * table-root/privilege.sys).
+ *
+ * <p>User table is the table which stores all user information. The schema of
user table is:
+ *
+ * <ul>
+ * <li>user (string): user name (primary key)
+ * <li>sha256 (bytes): sha256 of password
+ * </ul>
+ *
+ * <p>Privilege table is the table storing what privileges each user have. Its
schema is:
+ *
+ * <ul>
+ * <li>name (string): user or role name (primary key)
+ * <li>entity_type (string): user or role (primary key)
+ * <li>identifier (string): identifier of object (primary key)
+ * <li>privilege (string): name of privilege (primary key), see {@link
PrivilegeType}
+ * </ul>
+ */
+public class FileBasedPrivilegeManager implements PrivilegeManager {
+
+ private static final String USER_TABLE_DIR = "user.sys";
+ private static final RowType USER_TABLE_TYPE =
+ RowType.of(
+ new DataType[] {DataTypes.STRING(), DataTypes.BYTES()},
+ new String[] {"user", "sha256"});
+
+ private static final String PRIVILEGE_TABLE_DIR = "privilege.sys";
+ private static final RowType PRIVILEGE_TABLE_TYPE =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"name", "entity_type", "identifier",
"privilege"});
+
+ private final String warehouse;
+ private final FileIO fileIO;
+ private final String user;
+ private final byte[] sha256;
+
+ private Table userTable;
+ private Table privilegeTable;
+
+ public FileBasedPrivilegeManager(
+ String warehouse, FileIO fileIO, String user, String password) {
+ this.warehouse = warehouse;
+ this.fileIO = fileIO;
+ this.user = user;
+ this.sha256 = getSha256(password);
+ }
+
+ @Override
+ public boolean privilegeEnabled() {
+ return getUserTable(false) != null && getPrivilegeTable(false) != null;
+ }
+
+ @Override
+ public void initializePrivilege(String rootPassword) {
+ if (privilegeEnabled()) {
+ throw new IllegalStateException(
+ "Privilege system is already enabled in warehouse " +
warehouse);
+ }
+
+ createUserTable();
+ createUserImpl(USER_ROOT, rootPassword);
+ createUserImpl(USER_ANONYMOUS, PASSWORD_ANONYMOUS);
+
+ createPrivilegeTable();
+ }
+
+ @Override
+ public void createUser(String user, String password) {
+ getPrivilegeChecker().assertCanCreateUser();
+ if (userExists(user)) {
+ throw new IllegalArgumentException("User " + user + " already
exists");
+ }
+ createUserImpl(user, password);
+ }
+
+ @Override
+ public void dropUser(String user) {
+ getPrivilegeChecker().assertCanDropUser();
+ Preconditions.checkArgument(!USER_ROOT.equals(user), USER_ROOT + "
cannot be dropped");
+ Preconditions.checkArgument(
+ !USER_ANONYMOUS.equals(user), USER_ANONYMOUS + " cannot be
dropped");
+ dropUserImpl(user);
+ }
+
+ @Override
+ public void grant(String user, String identifier, PrivilegeType privilege)
{
+ getPrivilegeChecker().assertCanGrant(identifier, privilege);
+ Preconditions.checkArgument(
+ !USER_ROOT.equals(user), "Cannot change privilege for user " +
USER_ROOT);
+ if (!userExists(user)) {
+ throw new IllegalArgumentException("User " + user + " does not
exist");
+ }
+ grantImpl(
+ Collections.singletonList(
+ new PrivilegeEntry(user, EntityType.USER, identifier,
privilege)));
+ }
+
+ @Override
+ public int revoke(String user, String identifier, PrivilegeType privilege)
{
+ getPrivilegeChecker().assertCanRevoke();
+ Preconditions.checkArgument(
+ !USER_ROOT.equals(user), "Cannot change privilege for user " +
USER_ROOT);
+ if (!userExists(user)) {
+ throw new IllegalArgumentException("User " + user + " does not
exist");
+ }
+ int count = revokeImpl(user, identifier, privilege);
+ Preconditions.checkArgument(
+ count > 0,
+ String.format(
+ "User %s does not have privilege %s on %s. "
+ + "It's possible that the user has such
privilege on a higher level. "
+ + "Please check the privilege table.",
+ user, privilege, identifier));
+ return count;
+ }
+
+ @Override
+ public void objectRenamed(String oldName, String newName) {
+ Table privilegeTable = getPrivilegeTable(true);
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(PRIVILEGE_TABLE_TYPE);
+ Predicate predicate = predicateBuilder.equal(2,
BinaryString.fromString(oldName));
+
+ BatchWriteBuilder writeBuilder = privilegeTable.newBatchWriteBuilder();
+ try (CloseableIterator<InternalRow> it = read(privilegeTable,
predicate);
+ BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ while (it.hasNext()) {
+ InternalRow row = it.next();
+ GenericRow replaced =
+ GenericRow.of(
+ row.getString(0),
+ row.getString(1),
+ BinaryString.fromString(newName),
+ row.getString(3));
+ write.write(replaced);
+ }
+ commit.commit(write.prepareCommit());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void objectDropped(String identifier) {
+ Table privilegeTable = getPrivilegeTable(true);
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(PRIVILEGE_TABLE_TYPE);
+ Predicate predicate = predicateBuilder.startsWith(2,
BinaryString.fromString(identifier));
+ deleteAll(privilegeTable, predicate);
+ }
+
+ @Override
+ public PrivilegeChecker getPrivilegeChecker() {
+ assertUserPassword();
+ if (USER_ROOT.equals(user)) {
+ return new AllGrantedPrivilegeChecker();
+ }
+
+ Table privilegeTable = getPrivilegeTable(true);
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(PRIVILEGE_TABLE_TYPE);
+ Predicate predicate =
+ PredicateBuilder.and(
+ predicateBuilder.equal(0,
BinaryString.fromString(user)),
+ predicateBuilder.equal(1,
BinaryString.fromString(EntityType.USER.name())));
+
+ Map<String, Set<PrivilegeType>> privileges = new HashMap<>();
+ try (CloseableIterator<InternalRow> it = read(privilegeTable,
predicate)) {
+ while (it.hasNext()) {
+ InternalRow row = it.next();
+ privileges
+ .computeIfAbsent(row.getString(2).toString(), ignore
-> new HashSet<>())
+
.add(PrivilegeType.valueOf(row.getString(3).toString()));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return new PrivilegeCheckerImpl(user, privileges);
+ }
+
+ private void createUserImpl(String user, String password) {
+ byte[] sha256 = getSha256(password);
+ BatchWriteBuilder writeBuilder =
getUserTable(true).newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(BinaryString.fromString(user), sha256));
+ commit.commit(write.prepareCommit());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void dropUserImpl(String user) {
+ BatchWriteBuilder writeBuilder =
getUserTable(true).newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(
+ GenericRow.ofKind(RowKind.DELETE,
BinaryString.fromString(user), new byte[0]));
+ commit.commit(write.prepareCommit());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(PRIVILEGE_TABLE_TYPE);
+ Predicate predicate =
+ PredicateBuilder.and(
+ predicateBuilder.equal(0,
BinaryString.fromString(user)),
+ predicateBuilder.equal(1,
BinaryString.fromString(EntityType.USER.name())));
+ deleteAll(getPrivilegeTable(true), predicate);
+ }
+
+ private void grantImpl(List<PrivilegeEntry> entries) {
+ BatchWriteBuilder writeBuilder =
getPrivilegeTable(true).newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ for (PrivilegeEntry entry : entries) {
+ write.write(
+ GenericRow.of(
+ BinaryString.fromString(entry.name),
+
BinaryString.fromString(entry.entityType.name()),
+ BinaryString.fromString(entry.identifier),
+
BinaryString.fromString(entry.privilege.name())));
+ }
+ commit.commit(write.prepareCommit());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private int revokeImpl(String user, String identifier, PrivilegeType
privilege) {
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(PRIVILEGE_TABLE_TYPE);
+ Predicate predicate =
+ PredicateBuilder.and(
+ predicateBuilder.equal(0,
BinaryString.fromString(user)),
+ predicateBuilder.equal(1,
BinaryString.fromString(EntityType.USER.name())),
+ predicateBuilder.startsWith(2,
BinaryString.fromString(identifier)),
+ predicateBuilder.equal(3,
BinaryString.fromString(privilege.name())));
+ return deleteAll(getPrivilegeTable(true), predicate);
+ }
+
+ private static class PrivilegeEntry {
+ String name;
+ EntityType entityType;
+ String identifier;
+ PrivilegeType privilege;
+
+ private PrivilegeEntry(
+ String name, EntityType entityType, String identifier,
PrivilegeType privilege) {
+ this.name = name;
+ this.entityType = entityType;
+ this.identifier = identifier;
+ this.privilege = privilege;
+ }
+ }
+
+ private boolean userExists(String user) {
+ Table userTable = getUserTable(true);
+ Predicate predicate =
+ new PredicateBuilder(USER_TABLE_TYPE).equal(0,
BinaryString.fromString(user));
+ try (CloseableIterator<InternalRow> it = read(userTable, predicate)) {
+ return it.hasNext();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void assertUserPassword() {
+ Table userTable = getUserTable(true);
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(USER_TABLE_TYPE);
+ Predicate predicate =
+ PredicateBuilder.and(
+ predicateBuilder.equal(0,
BinaryString.fromString(user)),
+ predicateBuilder.equal(1, sha256));
+
+ try (CloseableIterator<InternalRow> it = read(userTable, predicate)) {
+ if (it.hasNext()) {
+ return;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ throw new IllegalArgumentException("User " + user + " not found, or
password incorrect.");
+ }
+
+ private Table getUserTable(boolean assertExists) {
+ userTable = getTable(userTable, USER_TABLE_DIR, assertExists);
+ return userTable;
+ }
+
+ private Table getPrivilegeTable(boolean assertExists) {
+ privilegeTable = getTable(privilegeTable, PRIVILEGE_TABLE_DIR,
assertExists);
+ return privilegeTable;
+ }
+
+ private Table getTable(Table lazy, String dir, boolean assertExists) {
+ if (lazy != null) {
+ return lazy;
+ }
+
+ Path tableRoot = new Path(warehouse, dir);
+ boolean tableExists;
+ try {
+ tableExists = fileIO.exists(tableRoot);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ if (tableExists) {
+ return FileStoreTableFactory.create(fileIO, tableRoot);
+ } else if (assertExists) {
+ throw new RuntimeException(
+ "Privilege system is not enabled in warehouse " +
warehouse + ".");
+ } else {
+ return null;
+ }
+ }
+
+ private void createUserTable() {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ Path tableRoot = new Path(warehouse, USER_TABLE_DIR);
+ SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot);
+ try {
+ schemaManager.createTable(
+ new Schema(
+ USER_TABLE_TYPE.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("user"),
+ options.toMap(),
+ ""));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void createPrivilegeTable() {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ Path tableRoot = new Path(warehouse, PRIVILEGE_TABLE_DIR);
+ SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot);
+ try {
+ schemaManager.createTable(
+ new Schema(
+ PRIVILEGE_TABLE_TYPE.getFields(),
+ Collections.emptyList(),
+ Arrays.asList("name", "entity_type", "privilege",
"identifier"),
+ options.toMap(),
+ ""));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private CloseableIterator<InternalRow> read(Table table, Predicate
predicate) {
+ ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ try {
+ return new RecordReaderIterator<>(
+ readBuilder.newRead().executeFilter().createReader(plan));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private int deleteAll(Table table, Predicate predicate) {
+ int count = 0;
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (CloseableIterator<InternalRow> it = read(table, predicate);
+ BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ while (it.hasNext()) {
+ InternalRow row = it.next();
+ row.setRowKind(RowKind.DELETE);
+ write.write(row);
+ count++;
+ }
+ commit.commit(write.prepareCommit());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return count;
+ }
+
+ private byte[] getSha256(String s) {
+ try {
+ return
MessageDigest.getInstance("SHA-256").digest(s.getBytes(StandardCharsets.UTF_8));
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/NoPrivilegeException.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/NoPrivilegeException.java
new file mode 100644
index 000000000..c868aebe7
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/NoPrivilegeException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/** Thrown when tries to perform an operation but the current user does not
have the privilege. */
+public class NoPrivilegeException extends RuntimeException {
+
+ public NoPrivilegeException(
+ String user, String objectType, String identifier,
PrivilegeType... privilege) {
+ super(
+ String.format(
+ "User %s doesn't have privilege %s on %s %s",
+ user,
+ Arrays.stream(privilege)
+ .map(Enum::name)
+ .collect(Collectors.joining(" or ")),
+ objectType,
+ identifier));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeChecker.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeChecker.java
new file mode 100644
index 000000000..ebd579bdb
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeChecker.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.io.Serializable;
+
+/** Check if current user has privilege to perform related operations. */
+public interface PrivilegeChecker extends Serializable {
+
+ void assertCanSelect(Identifier identifier);
+
+ void assertCanInsert(Identifier identifier);
+
+ void assertCanAlterTable(Identifier identifier);
+
+ void assertCanDropTable(Identifier identifier);
+
+ void assertCanCreateTable(String databaseName);
+
+ void assertCanDropDatabase(String databaseName);
+
+ void assertCanCreateDatabase();
+
+ void assertCanCreateUser();
+
+ void assertCanDropUser();
+
+ void assertCanGrant(String identifier, PrivilegeType privilege);
+
+ void assertCanRevoke();
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeCheckerImpl.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeCheckerImpl.java
new file mode 100644
index 000000000..19c1813ee
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeCheckerImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Default implementation of {@link PrivilegeChecker}. */
+public class PrivilegeCheckerImpl implements PrivilegeChecker {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String user;
+ private final Map<String, Set<PrivilegeType>> privileges;
+
+ public PrivilegeCheckerImpl(String user, Map<String, Set<PrivilegeType>>
privileges) {
+ this.user = user;
+ this.privileges = privileges;
+ }
+
+ @Override
+ public void assertCanSelect(Identifier identifier) {
+ if (!check(identifier.getFullName(), PrivilegeType.SELECT)) {
+ throw new NoPrivilegeException(
+ user, "table", identifier.getFullName(),
PrivilegeType.SELECT);
+ }
+ }
+
+ @Override
+ public void assertCanInsert(Identifier identifier) {
+ if (!check(identifier.getFullName(), PrivilegeType.INSERT)) {
+ throw new NoPrivilegeException(
+ user, "table", identifier.getFullName(),
PrivilegeType.INSERT);
+ }
+ }
+
+ @Override
+ public void assertCanAlterTable(Identifier identifier) {
+ if (!check(identifier.getFullName(), PrivilegeType.ALTER_TABLE)) {
+ throw new NoPrivilegeException(
+ user, "table", identifier.getFullName(),
PrivilegeType.ALTER_TABLE);
+ }
+ }
+
+ @Override
+ public void assertCanDropTable(Identifier identifier) {
+ if (!check(identifier.getFullName(), PrivilegeType.DROP_TABLE)) {
+ throw new NoPrivilegeException(
+ user, "table", identifier.getFullName(),
PrivilegeType.DROP_TABLE);
+ }
+ }
+
+ @Override
+ public void assertCanCreateTable(String databaseName) {
+ if (!check(databaseName, PrivilegeType.CREATE_TABLE)) {
+ throw new NoPrivilegeException(
+ user, "database", databaseName,
PrivilegeType.CREATE_TABLE);
+ }
+ }
+
+ @Override
+ public void assertCanDropDatabase(String databaseName) {
+ if (!check(databaseName, PrivilegeType.DROP_DATABASE)) {
+ throw new NoPrivilegeException(
+ user, "database", databaseName,
PrivilegeType.DROP_DATABASE);
+ }
+ }
+
+ @Override
+ public void assertCanCreateDatabase() {
+ if (!check(
+ FileBasedPrivilegeManager.IDENTIFIER_WHOLE_CATALOG,
+ PrivilegeType.CREATE_DATABASE)) {
+ throw new NoPrivilegeException(
+ user,
+ "catalog",
+ FileBasedPrivilegeManager.IDENTIFIER_WHOLE_CATALOG,
+ PrivilegeType.DROP_DATABASE);
+ }
+ }
+
+ @Override
+ public void assertCanCreateUser() {
+ assertHasAdmin();
+ }
+
+ @Override
+ public void assertCanDropUser() {
+ assertHasAdmin();
+ }
+
+ @Override
+ public void assertCanGrant(String identifier, PrivilegeType privilege) {
+ assertHasAdmin();
+ }
+
+ @Override
+ public void assertCanRevoke() {
+ assertHasAdmin();
+ }
+
+ private void assertHasAdmin() {
+ if (!check(FileBasedPrivilegeManager.IDENTIFIER_WHOLE_CATALOG,
PrivilegeType.ADMIN)) {
+ throw new NoPrivilegeException(
+ user,
+ "catalog",
+ FileBasedPrivilegeManager.IDENTIFIER_WHOLE_CATALOG,
+ PrivilegeType.ADMIN);
+ }
+ }
+
+ private boolean check(String identifier, PrivilegeType privilege) {
+ Set<PrivilegeType> set = privileges.get(identifier);
+ if (set != null && set.contains(privilege)) {
+ return true;
+ } else if (identifier.isEmpty()) {
+ return false;
+ } else {
+ return check(
+ identifier.substring(0,
Math.max(identifier.lastIndexOf('.'), 0)), privilege);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PrivilegeCheckerImpl that = (PrivilegeCheckerImpl) o;
+ return Objects.equals(user, that.user) && Objects.equals(privileges,
that.privileges);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeManager.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeManager.java
new file mode 100644
index 000000000..52eb81ea5
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeManager.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+/**
+ * Interface for the privilege system which supports identity-based and
role-based access control.
+ *
+ * <p>When the privilege system is initialized two special users, root and
anonymous, will be
+ * created by default.
+ *
+ * <ul>
+ * <li>root is the user with all privileges. The password of root is set
when calling {@link
+ * PrivilegeManager#initializePrivilege}.
+ * <li>anonymous is the default username if no user and password are
provided when creating the
+ * catalog. The default password of anonymous is <code>anonymous</code>.
+ * </ul>
+ *
+ * <p>The privilege system also follows a hierarchical model. That is, If a
user has a privilege on
+ * an identifier A, he also has this privilege on identifier B, where A is a
prefix of B.
+ * Identifiers can be
+ *
+ * <ul>
+ * <li>the whole catalog (identifier is an empty string)
+ * <li>a database (identifier is <database-name>)
+ * <li>a table (identifier is <database-name>.<table-name>)
+ * </ul>
+ */
+public interface PrivilegeManager {
+
+ String USER_ROOT = "root";
+ String USER_ANONYMOUS = "anonymous";
+ String PASSWORD_ANONYMOUS = "anonymous";
+ String IDENTIFIER_WHOLE_CATALOG = "";
+
+ /** Check if the privilege system is enabled. */
+ boolean privilegeEnabled();
+
+ /**
+ * Initialize the privilege system if not enabled. Also creates two
special users: root and
+ * anonymous.
+ */
+ void initializePrivilege(String rootPassword);
+
+ /** Create {@code user} with {@code password}. */
+ void createUser(String user, String password);
+
+ /** Remove {@code user} from the privilege system. */
+ void dropUser(String user);
+
+ /** Grant {@code user} with {@code privilege} on {@code identifier}. */
+ void grant(String user, String identifier, PrivilegeType privilege);
+
+ /**
+ * Revoke {@code privilege} from {@code user} on {@code identifier}. Note
that {@code user} will
+ * also lose {@code privilege} on all descendants of {@code identifier}.
+ */
+ int revoke(String user, String identifier, PrivilegeType privilege);
+
+ /**
+ * Notify the privilege system that the identifier of an object is changed
from {@code oldName}
+ * to {@code newName}.
+ */
+ void objectRenamed(String oldName, String newName);
+
+ /** Notify the privilege system that the object with {@code identifier} is
dropped. */
+ void objectDropped(String identifier);
+
+ /**
+ * Get {@link PrivilegeChecker} of this privilege system to check if a
user has specific
+ * privileges on an object.
+ */
+ PrivilegeChecker getPrivilegeChecker();
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeType.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeType.java
new file mode 100644
index 000000000..375f5030d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeType.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.annotation.Public;
+
+/**
+ * Types of privilege.
+ *
+ * @since 0.7.0
+ */
+@Public
+public enum PrivilegeType {
+ SELECT(PrivilegeTarget.TABLE),
+ INSERT(PrivilegeTarget.TABLE),
+ ALTER_TABLE(PrivilegeTarget.TABLE),
+ DROP_TABLE(PrivilegeTarget.TABLE),
+
+ CREATE_TABLE(PrivilegeTarget.DATABASE),
+ DROP_DATABASE(PrivilegeTarget.DATABASE),
+
+ CREATE_DATABASE(PrivilegeTarget.CATALOG),
+ // you can create and drop users, grant and revoke any privileges to or
from others
+ ADMIN(PrivilegeTarget.CATALOG);
+
+ private final PrivilegeTarget target;
+
+ PrivilegeType(PrivilegeTarget target) {
+ this.target = target;
+ }
+
+ public boolean canGrantOnCatalog() {
+ return PrivilegeTarget.CATALOG.equals(target) || canGrantOnDatabase();
+ }
+
+ public boolean canGrantOnDatabase() {
+ return PrivilegeTarget.DATABASE.equals(target) || canGrantOnTable();
+ }
+
+ public boolean canGrantOnTable() {
+ return PrivilegeTarget.TABLE.equals(target);
+ }
+
+ private enum PrivilegeTarget {
+ CATALOG,
+ DATABASE,
+ TABLE
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
new file mode 100644
index 000000000..7c2ab9493
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** {@link Catalog} which supports privilege system. */
+public class PrivilegedCatalog implements Catalog {
+
+ public static final ConfigOption<String> USER =
+
ConfigOptions.key("user").stringType().defaultValue(PrivilegeManager.USER_ANONYMOUS);
+ public static final ConfigOption<String> PASSWORD =
+ ConfigOptions.key("password")
+ .stringType()
+ .defaultValue(PrivilegeManager.PASSWORD_ANONYMOUS);
+
+ private final Catalog wrapped;
+ private final PrivilegeManager privilegeManager;
+
+ public PrivilegedCatalog(Catalog wrapped, PrivilegeManager
privilegeManager) {
+ this.wrapped = wrapped;
+ this.privilegeManager = privilegeManager;
+ }
+
+ @Override
+ public boolean caseSensitive() {
+ return wrapped.caseSensitive();
+ }
+
+ @Override
+ public String warehouse() {
+ return wrapped.warehouse();
+ }
+
+ @Override
+ public Map<String, String> options() {
+ return wrapped.options();
+ }
+
+ @Override
+ public FileIO fileIO() {
+ return wrapped.fileIO();
+ }
+
+ @Override
+ public Optional<CatalogLockFactory> lockFactory() {
+ return wrapped.lockFactory();
+ }
+
+ @Override
+ public Optional<CatalogLockContext> lockContext() {
+ return wrapped.lockContext();
+ }
+
+ @Override
+ public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier
identifier) {
+ return wrapped.metastoreClientFactory(identifier);
+ }
+
+ @Override
+ public List<String> listDatabases() {
+ return wrapped.listDatabases();
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) {
+ return wrapped.databaseExists(databaseName);
+ }
+
+ @Override
+ public void createDatabase(String name, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException {
+ privilegeManager.getPrivilegeChecker().assertCanCreateDatabase();
+ wrapped.createDatabase(name, ignoreIfExists);
+ }
+
+ @Override
+ public void createDatabase(String name, boolean ignoreIfExists,
Map<String, String> properties)
+ throws DatabaseAlreadyExistException {
+ privilegeManager.getPrivilegeChecker().assertCanCreateDatabase();
+ wrapped.createDatabase(name, ignoreIfExists, properties);
+ }
+
+ @Override
+ public Map<String, String> loadDatabaseProperties(String name)
+ throws DatabaseNotExistException {
+ return wrapped.loadDatabaseProperties(name);
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException {
+ privilegeManager.getPrivilegeChecker().assertCanDropDatabase(name);
+ wrapped.dropDatabase(name, ignoreIfNotExists, cascade);
+ privilegeManager.objectDropped(name);
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws
DatabaseNotExistException {
+ return wrapped.listTables(databaseName);
+ }
+
+ @Override
+ public boolean tableExists(Identifier identifier) {
+ return wrapped.tableExists(identifier);
+ }
+
+ @Override
+ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
+ throws TableNotExistException {
+ privilegeManager.getPrivilegeChecker().assertCanDropTable(identifier);
+ wrapped.dropTable(identifier, ignoreIfNotExists);
+ privilegeManager.objectDropped(identifier.getFullName());
+ }
+
+ @Override
+ public void createTable(Identifier identifier, Schema schema, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException {
+
privilegeManager.getPrivilegeChecker().assertCanCreateTable(identifier.getDatabaseName());
+ wrapped.createTable(identifier, schema, ignoreIfExists);
+ }
+
+ @Override
+ public void renameTable(Identifier fromTable, Identifier toTable, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException {
+ privilegeManager.getPrivilegeChecker().assertCanAlterTable(fromTable);
+ wrapped.renameTable(fromTable, toTable, ignoreIfNotExists);
+ Preconditions.checkState(
+ wrapped.tableExists(toTable),
+ "Table "
+ + toTable
+ + " does not exist. There might be concurrent
renaming. "
+ + "Aborting updates in privilege system.");
+ privilegeManager.objectRenamed(fromTable.getFullName(),
toTable.getFullName());
+ }
+
+ @Override
+ public void alterTable(Identifier identifier, SchemaChange change, boolean
ignoreIfNotExists)
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ privilegeManager.getPrivilegeChecker().assertCanAlterTable(identifier);
+ wrapped.alterTable(identifier, change, ignoreIfNotExists);
+ }
+
+ @Override
+ public void alterTable(
+ Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ privilegeManager.getPrivilegeChecker().assertCanAlterTable(identifier);
+ wrapped.alterTable(identifier, changes, ignoreIfNotExists);
+ }
+
+ @Override
+ public Table getTable(Identifier identifier) throws TableNotExistException
{
+ Table table = wrapped.getTable(identifier);
+ if (table instanceof FileStoreTable) {
+ return new PrivilegedFileStoreTable(
+ (FileStoreTable) table,
privilegeManager.getPrivilegeChecker(), identifier);
+ } else {
+ return table;
+ }
+ }
+
+ @Override
+ public void dropPartition(Identifier identifier, Map<String, String>
partitions)
+ throws TableNotExistException, PartitionNotExistException {
+ privilegeManager.getPrivilegeChecker().assertCanInsert(identifier);
+ wrapped.dropPartition(identifier, partitions);
+ }
+
+ @Override
+ public void close() throws Exception {
+ wrapped.close();
+ }
+
+ public void createPrivilegedUser(String user, String password) {
+ privilegeManager.createUser(user, password);
+ }
+
+ public void dropPrivilegedUser(String user) {
+ privilegeManager.dropUser(user);
+ }
+
+ public void grantPrivilegeOnCatalog(String user, PrivilegeType privilege) {
+ Preconditions.checkArgument(
+ privilege.canGrantOnCatalog(),
+ "Privilege " + privilege + " can't be granted on a catalog");
+ privilegeManager.grant(user,
PrivilegeManager.IDENTIFIER_WHOLE_CATALOG, privilege);
+ }
+
+ public void grantPrivilegeOnDatabase(
+ String user, String databaseName, PrivilegeType privilege) {
+ Preconditions.checkArgument(
+ privilege.canGrantOnDatabase(),
+ "Privilege " + privilege + " can't be granted on a database");
+ Preconditions.checkArgument(
+ databaseExists(databaseName), "Database " + databaseName + "
does not exist");
+ privilegeManager.grant(user, databaseName, privilege);
+ }
+
+ public void grantPrivilegeOnTable(String user, Identifier identifier,
PrivilegeType privilege) {
+ Preconditions.checkArgument(
+ privilege.canGrantOnTable(),
+ "Privilege " + privilege + " can't be granted on a table");
+ Preconditions.checkArgument(
+ tableExists(identifier), "Table " + identifier + " does not
exist");
+ privilegeManager.grant(user, identifier.getFullName(), privilege);
+ }
+
+ /** Returns the number of privilege revoked. */
+ public int revokePrivilegeOnCatalog(String user, PrivilegeType privilege) {
+ return privilegeManager.revoke(user,
PrivilegeManager.IDENTIFIER_WHOLE_CATALOG, privilege);
+ }
+
+ /** Returns the number of privilege revoked. */
+ public int revokePrivilegeOnDatabase(
+ String user, String databaseName, PrivilegeType privilege) {
+ return privilegeManager.revoke(user, databaseName, privilege);
+ }
+
+ /** Returns the number of privilege revoked. */
+ public int revokePrivilegeOnTable(String user, Identifier identifier,
PrivilegeType privilege) {
+ return privilegeManager.revoke(user, identifier.getFullName(),
privilege);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
new file mode 100644
index 000000000..cf8e4c357
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.ManifestCacheFilter;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreCommit;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.FileStoreWrite;
+import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.service.ServiceManager;
+import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.TagCallback;
+import org.apache.paimon.tag.TagAutoManager;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** {@link FileStore} with privilege checks. */
+public class PrivilegedFileStore<T> implements FileStore<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileStore<T> wrapped;
+ private final PrivilegeChecker privilegeChecker;
+ private final Identifier identifier;
+
+ public PrivilegedFileStore(
+ FileStore<T> wrapped, PrivilegeChecker privilegeChecker,
Identifier identifier) {
+ this.wrapped = wrapped;
+ this.privilegeChecker = privilegeChecker;
+ this.identifier = identifier;
+ }
+
+ @Override
+ public FileStorePathFactory pathFactory() {
+ return wrapped.pathFactory();
+ }
+
+ @Override
+ public SnapshotManager snapshotManager() {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.snapshotManager();
+ }
+
+ @Override
+ public RowType partitionType() {
+ return wrapped.partitionType();
+ }
+
+ @Override
+ public CoreOptions options() {
+ return wrapped.options();
+ }
+
+ @Override
+ public BucketMode bucketMode() {
+ return wrapped.bucketMode();
+ }
+
+ @Override
+ public FileStoreScan newScan() {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.newScan();
+ }
+
+ @Override
+ public FileStoreScan newScan(String branchName) {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.newScan(branchName);
+ }
+
+ @Override
+ public ManifestList.Factory manifestListFactory() {
+ return wrapped.manifestListFactory();
+ }
+
+ @Override
+ public ManifestFile.Factory manifestFileFactory() {
+ return wrapped.manifestFileFactory();
+ }
+
+ @Override
+ public IndexFileHandler newIndexFileHandler() {
+ return wrapped.newIndexFileHandler();
+ }
+
+ @Override
+ public StatsFileHandler newStatsFileHandler() {
+ return wrapped.newStatsFileHandler();
+ }
+
+ @Override
+ public SplitRead<T> newRead() {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.newRead();
+ }
+
+ @Override
+ public FileStoreWrite<T> newWrite(String commitUser) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newWrite(commitUser);
+ }
+
+ @Override
+ public FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter
manifestFilter) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newWrite(commitUser, manifestFilter);
+ }
+
+ @Override
+ public FileStoreCommit newCommit(String commitUser) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newCommit(commitUser);
+ }
+
+ @Override
+ public FileStoreCommit newCommit(String commitUser, String branchName) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newCommit(commitUser, branchName);
+ }
+
+ @Override
+ public SnapshotDeletion newSnapshotDeletion() {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newSnapshotDeletion();
+ }
+
+ @Override
+ public TagManager newTagManager() {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newTagManager();
+ }
+
+ @Override
+ public TagDeletion newTagDeletion() {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newTagDeletion();
+ }
+
+ @Nullable
+ @Override
+ public PartitionExpire newPartitionExpire(String commitUser) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newPartitionExpire(commitUser);
+ }
+
+ @Override
+ public TagAutoManager newTagCreationManager() {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newTagCreationManager();
+ }
+
+ @Override
+ public ServiceManager newServiceManager() {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.newServiceManager();
+ }
+
+ @Override
+ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.mergeSchema(rowType, allowExplicitCast);
+ }
+
+ @Override
+ public List<TagCallback> createTagCallbacks() {
+ return wrapped.createTagCallbacks();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
new file mode 100644
index 000000000..5820e46b4
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestCacheFilter;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.ExpireSnapshots;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.query.LocalTableQuery;
+import org.apache.paimon.table.sink.RowKeyExtractor;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** {@link FileStoreTable} with privilege checks. */
+public class PrivilegedFileStoreTable implements FileStoreTable {
+
+ private final FileStoreTable wrapped;
+ private final PrivilegeChecker privilegeChecker;
+ private final Identifier identifier;
+
+ public PrivilegedFileStoreTable(
+ FileStoreTable wrapped, PrivilegeChecker privilegeChecker,
Identifier identifier) {
+ this.wrapped = wrapped;
+ this.privilegeChecker = privilegeChecker;
+ this.identifier = identifier;
+ }
+
+ @Override
+ public SnapshotReader newSnapshotReader() {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.newSnapshotReader();
+ }
+
+ @Override
+ public SnapshotReader newSnapshotReader(String branchName) {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.newSnapshotReader(branchName);
+ }
+
+ @Override
+ public CoreOptions coreOptions() {
+ return wrapped.coreOptions();
+ }
+
+ @Override
+ public SnapshotManager snapshotManager() {
+ return wrapped.snapshotManager();
+ }
+
+ @Override
+ public TagManager tagManager() {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.tagManager();
+ }
+
+ @Override
+ public BranchManager branchManager() {
+ privilegeChecker.assertCanSelect(identifier);
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.branchManager();
+ }
+
+ @Override
+ public Path location() {
+ return wrapped.location();
+ }
+
+ @Override
+ public FileIO fileIO() {
+ return wrapped.fileIO();
+ }
+
+ @Override
+ public TableSchema schema() {
+ return wrapped.schema();
+ }
+
+ @Override
+ public FileStore<?> store() {
+ return new PrivilegedFileStore<>(wrapped.store(), privilegeChecker,
identifier);
+ }
+
+ @Override
+ public BucketMode bucketMode() {
+ return wrapped.bucketMode();
+ }
+
+ @Override
+ public CatalogEnvironment catalogEnvironment() {
+ return wrapped.catalogEnvironment();
+ }
+
+ @Override
+ public Optional<Statistics> statistics() {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.statistics();
+ }
+
+ @Override
+ public FileStoreTable copy(Map<String, String> dynamicOptions) {
+ return new PrivilegedFileStoreTable(
+ wrapped.copy(dynamicOptions), privilegeChecker, identifier);
+ }
+
+ @Override
+ public FileStoreTable copy(TableSchema newTableSchema) {
+ return new PrivilegedFileStoreTable(
+ wrapped.copy(newTableSchema), privilegeChecker, identifier);
+ }
+
+ @Override
+ public void rollbackTo(long snapshotId) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.rollbackTo(snapshotId);
+ }
+
+ @Override
+ public void createTag(String tagName) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.createTag(tagName);
+ }
+
+ @Override
+ public void createTag(String tagName, long fromSnapshotId) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.createTag(tagName, fromSnapshotId);
+ }
+
+ @Override
+ public void createTag(String tagName, Duration timeRetained) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.createTag(tagName, timeRetained);
+ }
+
+ @Override
+ public void createTag(String tagName, long fromSnapshotId, Duration
timeRetained) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.createTag(tagName, fromSnapshotId, timeRetained);
+ }
+
+ @Override
+ public void deleteTag(String tagName) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.deleteTag(tagName);
+ }
+
+ @Override
+ public void rollbackTo(String tagName) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.rollbackTo(tagName);
+ }
+
+ @Override
+ public void createBranch(String branchName) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.createBranch(branchName);
+ }
+
+ @Override
+ public void createBranch(String branchName, long snapshotId) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.createBranch(branchName, snapshotId);
+ }
+
+ @Override
+ public void createBranch(String branchName, String tagName) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.createBranch(branchName, tagName);
+ }
+
+ @Override
+ public void deleteBranch(String branchName) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.deleteBranch(branchName);
+ }
+
+ @Override
+ public ExpireSnapshots newExpireSnapshots() {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newExpireSnapshots();
+ }
+
+ @Override
+ public ExpireSnapshots newExpireChangelog() {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newExpireChangelog();
+ }
+
+ @Override
+ public FileStoreTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
+ return new PrivilegedFileStoreTable(
+ wrapped.copyWithoutTimeTravel(dynamicOptions),
privilegeChecker, identifier);
+ }
+
+ @Override
+ public FileStoreTable copyWithLatestSchema() {
+ return new PrivilegedFileStoreTable(
+ wrapped.copyWithLatestSchema(), privilegeChecker, identifier);
+ }
+
+ @Override
+ public InnerTableScan newScan() {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.newScan();
+ }
+
+ @Override
+ public InnerStreamTableScan newStreamScan() {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.newStreamScan();
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.newRead();
+ }
+
+ @Override
+ public TableWriteImpl<?> newWrite(String commitUser) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newWrite(commitUser);
+ }
+
+ @Override
+ public TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter
manifestFilter) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newWrite(commitUser, manifestFilter);
+ }
+
+ @Override
+ public TableCommitImpl newCommit(String commitUser) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newCommit(commitUser);
+ }
+
+ @Override
+ public TableCommitImpl newCommit(String commitUser, String branchName) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newCommit(commitUser, branchName);
+ }
+
+ @Override
+ public LocalTableQuery newLocalTableQuery() {
+ privilegeChecker.assertCanSelect(identifier);
+ return wrapped.newLocalTableQuery();
+ }
+
+ @Override
+ public boolean supportStreamingReadOverwrite() {
+ return wrapped.supportStreamingReadOverwrite();
+ }
+
+ @Override
+ public RowKeyExtractor createRowKeyExtractor() {
+ return wrapped.createRowKeyExtractor();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PrivilegedFileStoreTable that = (PrivilegedFileStoreTable) o;
+ return Objects.equals(wrapped, that.wrapped)
+ && Objects.equals(privilegeChecker, that.privilegeChecker)
+ && Objects.equals(identifier, that.identifier);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/privilege/FileBasedPrivilegeManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/privilege/FileBasedPrivilegeManagerTest.java
new file mode 100644
index 000000000..548a3925f
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/privilege/FileBasedPrivilegeManagerTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link FileBasedPrivilegeManager}. */
+public class FileBasedPrivilegeManagerTest {
+
+ private static final String PASSWORD_ROOT = "123456";
+
+ @TempDir public java.nio.file.Path tempPath;
+ private Path warehouse;
+
+ @BeforeEach
+ public void beforeEach() {
+ warehouse = new Path(TraceableFileIO.SCHEME + "://" +
tempPath.toString());
+ }
+
+ @AfterEach
+ public void afterEach() {
+ Predicate<Path> pathPredicate = path ->
path.toString().contains(tempPath.toString());
+ assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
+ assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+ }
+
+ @Test
+ public void testInitializeMultipleTimes() throws Exception {
+ initPrivilege();
+ assertThrows(IllegalStateException.class, this::initPrivilege);
+ }
+
+ @Test
+ public void testUsers() throws Exception {
+ initPrivilege();
+
+ FileBasedPrivilegeManager rootManager = getPrivilegeManager("root",
PASSWORD_ROOT);
+ rootManager.createUser("test", "passwd");
+ assertThrows(
+ IllegalArgumentException.class, () ->
rootManager.createUser("test", "changed"));
+
+ FileBasedPrivilegeManager testManager = getPrivilegeManager("test",
"passwd");
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> getPrivilegeManager("test",
"wrong").getPrivilegeChecker());
+ assertThrows(NoPrivilegeException.class, () ->
testManager.createUser("test2", "passwd"));
+
+ rootManager.dropUser("test");
+ rootManager.dropUser("test");
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> getPrivilegeManager("test",
"passwd").getPrivilegeChecker());
+ assertThrows(IllegalArgumentException.class, () ->
rootManager.dropUser("root"));
+ assertThrows(IllegalArgumentException.class, () ->
rootManager.dropUser("anonymous"));
+ }
+
+ @Test
+ public void testGrantAndRevoke() throws Exception {
+ initPrivilege();
+
+ FileBasedPrivilegeManager rootManager = getPrivilegeManager("root",
PASSWORD_ROOT);
+ rootManager.createUser("test", "passwd");
+ rootManager.grant("test", "my_db", PrivilegeType.SELECT);
+ rootManager.grant("test", "another_db.my_tbl", PrivilegeType.SELECT);
+ rootManager.grant("test", "another_db.my_tbl", PrivilegeType.INSERT);
+ rootManager.grant("test", "another_db.another_tbl",
PrivilegeType.INSERT);
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> rootManager.grant("test2", "my_db",
PrivilegeType.SELECT));
+ FileBasedPrivilegeManager testManager = getPrivilegeManager("test",
"passwd");
+
+ {
+ PrivilegeChecker checker = testManager.getPrivilegeChecker();
+
+ checker.assertCanSelect(Identifier.create("my_db", "my_tbl"));
+ checker.assertCanSelect(Identifier.create("my_db", "another_tbl"));
+ checker.assertCanSelect(Identifier.create("another_db", "my_tbl"));
+ assertThrows(
+ NoPrivilegeException.class,
+ () ->
checker.assertCanSelect(Identifier.create("another_db", "another_tbl")));
+
+ assertThrows(
+ NoPrivilegeException.class,
+ () -> checker.assertCanInsert(Identifier.create("my_db",
"my_tbl")));
+ assertThrows(
+ NoPrivilegeException.class,
+ () -> checker.assertCanInsert(Identifier.create("my_db",
"another_tbl")));
+ checker.assertCanInsert(Identifier.create("another_db", "my_tbl"));
+ checker.assertCanInsert(Identifier.create("another_db",
"another_tbl"));
+
+ assertThrows(
+ NoPrivilegeException.class,
+ () ->
+ testManager.grant(
+ "test", "another_db.another_tbl",
PrivilegeType.SELECT));
+ }
+
+ rootManager.revoke("test", "another_db", PrivilegeType.INSERT);
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> rootManager.revoke("test2", "another_db",
PrivilegeType.INSERT));
+
+ {
+ PrivilegeChecker checker = testManager.getPrivilegeChecker();
+
+ checker.assertCanSelect(Identifier.create("my_db", "my_tbl"));
+ checker.assertCanSelect(Identifier.create("my_db", "another_tbl"));
+ checker.assertCanSelect(Identifier.create("another_db", "my_tbl"));
+ assertThrows(
+ NoPrivilegeException.class,
+ () ->
checker.assertCanSelect(Identifier.create("another_db", "another_tbl")));
+
+ assertThrows(
+ NoPrivilegeException.class,
+ () -> checker.assertCanInsert(Identifier.create("my_db",
"my_tbl")));
+ assertThrows(
+ NoPrivilegeException.class,
+ () -> checker.assertCanInsert(Identifier.create("my_db",
"another_tbl")));
+ assertThrows(
+ NoPrivilegeException.class,
+ () ->
checker.assertCanInsert(Identifier.create("another_db", "my_tbl")));
+ assertThrows(
+ NoPrivilegeException.class,
+ () ->
checker.assertCanInsert(Identifier.create("another_db", "another_tbl")));
+ }
+ }
+
+ private void initPrivilege() throws Exception {
+ getPrivilegeManager("anonymous",
"anonymous").initializePrivilege(PASSWORD_ROOT);
+ }
+
+ private FileBasedPrivilegeManager getPrivilegeManager(String user, String
password)
+ throws Exception {
+ return new FileBasedPrivilegeManager(
+ warehouse.toString(),
+ FileIO.get(warehouse, CatalogContext.create(warehouse)),
+ user,
+ password);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java
index f4bdca414..a8d645c4b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java
@@ -18,8 +18,8 @@
package org.apache.paimon.stats;
-import org.apache.paimon.AbstractFileStore;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.io.DataFileMeta;
@@ -67,7 +67,7 @@ public class StatsTableTest extends TableTestBase {
GenericRow.of(2, 1, 1));
FileStoreTable storeTable = (FileStoreTable) table;
- AbstractFileStore<?> store = (AbstractFileStore<?>) storeTable.store();
+ FileStore<?> store = storeTable.store();
String manifestListFile =
storeTable.snapshotManager().latestSnapshot().deltaManifestList();
ManifestList manifestList = store.manifestListFactory().create();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 138a30d5b..bccf0fe0e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -18,8 +18,8 @@
package org.apache.paimon.table;
-import org.apache.paimon.AbstractFileStore;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TestFileStore;
import org.apache.paimon.data.BinaryRow;
@@ -1182,7 +1182,7 @@ public abstract class FileStoreTableTestBase {
SnapshotManager snapshotManager = table.snapshotManager();
long latestSnapshotId = snapshotManager.latestSnapshotId();
- AbstractFileStore<?> store = (AbstractFileStore<?>) table.store();
+ FileStore<?> store = table.store();
Set<Path> filesInUse =
TestFileStore.getFilesInUse(
latestSnapshotId,
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index adf4adf2e..098fa17e9 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.procedure;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.action.SortCompactAction;
@@ -78,8 +77,8 @@ public class CompactProcedure extends ProcedureBase {
String orderByColumns,
String tableOptions)
throws Exception {
- String warehouse = ((AbstractCatalog) catalog).warehouse();
- Map<String, String> catalogOptions = ((AbstractCatalog)
catalog).options();
+ String warehouse = catalog.warehouse();
+ Map<String, String> catalogOptions = catalog.options();
Map<String, String> tableConf =
StringUtils.isBlank(tableOptions)
? Collections.emptyMap()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
index 168947813..4fea9b635 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.procedure;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.flink.action.CompactDatabaseAction;
import org.apache.paimon.utils.StringUtils;
@@ -99,8 +98,8 @@ public class CompactDatabaseProcedure extends ProcedureBase {
String excludingTables,
String tableOptions)
throws Exception {
- String warehouse = ((AbstractCatalog) catalog).warehouse();
- Map<String, String> catalogOptions = ((AbstractCatalog)
catalog).options();
+ String warehouse = catalog.warehouse();
+ Map<String, String> catalogOptions = catalog.options();
CompactDatabaseAction action =
new CompactDatabaseAction(warehouse, catalogOptions)
.includingDatabases(nullable(includingDatabases))
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 8a78ad3b8..01421c8bf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.procedure;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.action.SortCompactAction;
@@ -62,8 +61,8 @@ public class CompactProcedure extends ProcedureBase {
String orderByColumns,
String tableOptions)
throws Exception {
- String warehouse = ((AbstractCatalog) catalog).warehouse();
- Map<String, String> catalogOptions = ((AbstractCatalog)
catalog).options();
+ String warehouse = catalog.warehouse();
+ Map<String, String> catalogOptions = catalog.options();
Map<String, String> tableConf =
isBlank(tableOptions)
? Collections.emptyMap()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
index c4236ac7d..acda2afd2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.procedure;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MergeIntoAction;
@@ -180,8 +179,8 @@ public class MergeIntoProcedure extends ProcedureBase {
String notMatchedInsertCondition,
String notMatchedInsertValues,
String matchedDeleteCondition) {
- String warehouse = ((AbstractCatalog) catalog).warehouse();
- Map<String, String> catalogOptions = ((AbstractCatalog)
catalog).options();
+ String warehouse = catalog.warehouse();
+ Map<String, String> catalogOptions = catalog.options();
Identifier identifier = Identifier.fromString(targetTableId);
MergeIntoAction action =
new MergeIntoAction(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/CreatePrivilegedUserProcedure.java
similarity index 53%
copy from
paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/CreatePrivilegedUserProcedure.java
index 8f4b6eede..76b1f6af8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/CreatePrivilegedUserProcedure.java
@@ -16,30 +16,29 @@
* limitations under the License.
*/
-package org.apache.paimon.catalog;
+package org.apache.paimon.flink.procedure.privilege;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.table.TableType;
+import org.apache.flink.table.procedure.ProcedureContext;
-import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
+/**
+ * Procedure to create a user for the privilege system. Only users with {@link
+ * org.apache.paimon.privilege.PrivilegeType#ADMIN} privilege can perform this
operation. Usage:
+ *
+ * <pre><code>
+ * CALL sys.create_privileged_user('username', 'password')
+ * </code></pre>
+ */
+public class CreatePrivilegedUserProcedure extends PrivilegeProcedureBase {
-/** Factory to create {@link FileSystemCatalog}. */
-public class FileSystemCatalogFactory implements CatalogFactory {
+ public static final String IDENTIFIER = "create_privileged_user";
- public static final String IDENTIFIER = "filesystem";
+ public String[] call(ProcedureContext procedureContext, String name,
String password) {
+ getPrivilegedCatalog().createPrivilegedUser(name, password);
+ return new String[] {String.format("User %s is created without any
privileges.", name)};
+ }
@Override
public String identifier() {
return IDENTIFIER;
}
-
- @Override
- public Catalog create(FileIO fileIO, Path warehouse, CatalogContext
context) {
- if (!TableType.MANAGED.equals(context.options().get(TABLE_TYPE))) {
- throw new IllegalArgumentException(
- "Only managed table is supported in File system catalog.");
- }
- return new FileSystemCatalog(fileIO, warehouse, context.options());
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/DropPrivilegedUserProcedure.java
similarity index 53%
copy from
paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/DropPrivilegedUserProcedure.java
index 8f4b6eede..3902a7f19 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/DropPrivilegedUserProcedure.java
@@ -16,30 +16,29 @@
* limitations under the License.
*/
-package org.apache.paimon.catalog;
+package org.apache.paimon.flink.procedure.privilege;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.table.TableType;
+import org.apache.flink.table.procedure.ProcedureContext;
-import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
+/**
+ * Procedure to drop a user from the privilege system. Only users with {@link
+ * org.apache.paimon.privilege.PrivilegeType#ADMIN} privilege can perform this
operation. Usage:
+ *
+ * <pre><code>
+ * CALL sys.drop_privileged_user('username')
+ * </code></pre>
+ */
+public class DropPrivilegedUserProcedure extends PrivilegeProcedureBase {
-/** Factory to create {@link FileSystemCatalog}. */
-public class FileSystemCatalogFactory implements CatalogFactory {
+ public static final String IDENTIFIER = "drop_privileged_user";
- public static final String IDENTIFIER = "filesystem";
+ public String[] call(ProcedureContext procedureContext, String name) {
+ getPrivilegedCatalog().dropPrivilegedUser(name);
+ return new String[] {String.format("User %s is dropped.", name)};
+ }
@Override
public String identifier() {
return IDENTIFIER;
}
-
- @Override
- public Catalog create(FileIO fileIO, Path warehouse, CatalogContext
context) {
- if (!TableType.MANAGED.equals(context.options().get(TABLE_TYPE))) {
- throw new IllegalArgumentException(
- "Only managed table is supported in File system catalog.");
- }
- return new FileSystemCatalog(fileIO, warehouse, context.options());
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/GrantPrivilegeToUserProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/GrantPrivilegeToUserProcedure.java
new file mode 100644
index 000000000..a647f36bc
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/GrantPrivilegeToUserProcedure.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.privilege.PrivilegeType;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Procedure to grant privilege to a user. Privilege can be granted on the
whole catalog, a database
+ * or a table. Only users with {@link
org.apache.paimon.privilege.PrivilegeType#ADMIN} privilege can
+ * perform this operation. Usage:
+ *
+ * <pre><code>
+ * CALL sys.grant_privilege_to_user('username', 'privilege')
+ * CALL sys.grant_privilege_to_user('username', 'privilege', 'database')
+ * CALL sys.grant_privilege_to_user('username', 'privilege', 'database',
'table')
+ * </code></pre>
+ */
+public class GrantPrivilegeToUserProcedure extends PrivilegeProcedureBase {
+
+ public static final String IDENTIFIER = "grant_privilege_to_user";
+
+ public String[] call(ProcedureContext procedureContext, String user,
String privilege) {
+ getPrivilegedCatalog().grantPrivilegeOnCatalog(user,
PrivilegeType.valueOf(privilege));
+ return new String[] {
+ String.format("User %s is granted with privilege %s on the
catalog.", user, privilege)
+ };
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String user, String privilege,
String database) {
+ getPrivilegedCatalog()
+ .grantPrivilegeOnDatabase(user, database,
PrivilegeType.valueOf(privilege));
+ return new String[] {
+ String.format(
+ "User %s is granted with privilege %s on database %s.",
+ user, privilege, database)
+ };
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String user,
+ String privilege,
+ String database,
+ String table) {
+ Identifier identifier = Identifier.create(database, table);
+ getPrivilegedCatalog()
+ .grantPrivilegeOnTable(user, identifier,
PrivilegeType.valueOf(privilege));
+ return new String[] {
+ String.format(
+ "User %s is granted with privilege %s on table %s.",
+ user, privilege, identifier)
+ };
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
new file mode 100644
index 000000000..7d71678a8
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure.privilege;
+
+import org.apache.paimon.flink.procedure.ProcedureBase;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.privilege.FileBasedPrivilegeManager;
+import org.apache.paimon.privilege.PrivilegeManager;
+import org.apache.paimon.privilege.PrivilegedCatalog;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Procedure to initialize file-based privilege system in warehouse. This
procedure will
+ * automatically create a root user with the provided password. Usage:
+ *
+ * <pre><code>
+ * CALL sys.init_file_based_privilege('rootPassword')
+ * </code></pre>
+ */
+public class InitFileBasedPrivilegeProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "init_file_based_privilege";
+
+ private static final List<String> SUPPORTED_CATALOGS =
+ Arrays.asList(
+ "org.apache.paimon.catalog.FileSystemCatalog",
+ "org.apache.paimon.hive.HiveCatalog");
+
+ public String[] call(ProcedureContext procedureContext, String
rootPassword) {
+ if (catalog instanceof PrivilegedCatalog) {
+ throw new IllegalArgumentException("Catalog is already a
PrivilegedCatalog");
+ } else if (!SUPPORTED_CATALOGS.contains(catalog.getClass().getName()))
{
+ throw new IllegalArgumentException(
+ "File based privilege system does not support " +
catalog.getClass().getName());
+ }
+
+ Options options = new Options(catalog.options());
+ PrivilegeManager privilegeManager =
+ new FileBasedPrivilegeManager(
+ catalog.warehouse(),
+ catalog.fileIO(),
+ options.get(PrivilegedCatalog.USER),
+ options.get(PrivilegedCatalog.PASSWORD));
+ privilegeManager.initializePrivilege(rootPassword);
+ return new String[] {
+ "Privilege system is successfully enabled. Please drop and
re-create the catalog."
+ };
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureBase.java
new file mode 100644
index 000000000..1bc9a3ba8
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure.privilege;
+
+import org.apache.paimon.flink.procedure.ProcedureBase;
+import org.apache.paimon.privilege.PrivilegedCatalog;
+
+/** Base class for privilege-related procedures. */
+public abstract class PrivilegeProcedureBase extends ProcedureBase {
+
+ protected PrivilegedCatalog getPrivilegedCatalog() {
+ if (catalog instanceof PrivilegedCatalog) {
+ return (PrivilegedCatalog) catalog;
+ } else {
+ throw new UnsupportedOperationException(
+ "The catalog you specified does not support privilege
system");
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/RevokePrivilegeFromUserProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/RevokePrivilegeFromUserProcedure.java
new file mode 100644
index 000000000..d4834ac60
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/RevokePrivilegeFromUserProcedure.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.privilege.PrivilegeType;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Procedure to revoke privilege from a user. Privilege can be revoked from
the whole catalog, a
+ * database or a table. Only users with {@link
org.apache.paimon.privilege.PrivilegeType#ADMIN}
+ * privilege can perform this operation. Usage:
+ *
+ * <pre><code>
+ * CALL sys.revoke_privilege_from_user('username', 'privilege')
+ * CALL sys.revoke_privilege_from_user('username', 'privilege', 'database')
+ * CALL sys.revoke_privilege_from_user('username', 'privilege', 'database',
'table')
+ * </code></pre>
+ */
+public class RevokePrivilegeFromUserProcedure extends PrivilegeProcedureBase {
+
+ public static final String IDENTIFIER = "revoke_privilege_from_user";
+
+ public String[] call(ProcedureContext procedureContext, String user,
String privilege) {
+ int count =
+ getPrivilegedCatalog()
+ .revokePrivilegeOnCatalog(user,
PrivilegeType.valueOf(privilege));
+ return new String[] {
+ String.format("User %s is revoked with privilege %s on the
catalog.", user, privilege),
+ "Number of privileges revoked: " + count
+ };
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String user, String privilege,
String database) {
+ int count =
+ getPrivilegedCatalog()
+ .revokePrivilegeOnDatabase(
+ user, database,
PrivilegeType.valueOf(privilege));
+ return new String[] {
+ String.format(
+ "User %s is revoked with privilege %s on database %s.",
+ user, privilege, database),
+ "Number of privileges revoked: " + count
+ };
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String user,
+ String privilege,
+ String database,
+ String table) {
+ Identifier identifier = Identifier.create(database, table);
+ int count =
+ getPrivilegedCatalog()
+ .revokePrivilegeOnTable(user, identifier,
PrivilegeType.valueOf(privilege));
+ return new String[] {
+ String.format(
+ "User %s is revoked with privilege %s on table %s.",
+ user, privilege, identifier),
+ "Number of privileges revoked: " + count
+ };
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index fe463161b..943da3e16 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -44,4 +44,9 @@ org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
-org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
+org.apache.paimon.flink.procedure.privilege.InitFileBasedPrivilegeProcedure
+org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure
+org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
+org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
+org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
new file mode 100644
index 000000000..d432c02ea
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure.privilege;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.privilege.NoPrivilegeException;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for privilege related procedures. */
+public class PrivilegeProcedureITCase extends AbstractTestBase {
+
+ private String path;
+
+ @BeforeEach
+ public void beforeEach() {
+ path = getTempDirPath();
+ }
+
+ @Test
+ public void testUserPrivileges() throws Exception {
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s'\n"
+ + ")",
+ path));
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql("CREATE DATABASE mydb");
+ tEnv.executeSql("CREATE DATABASE mydb2");
+ tEnv.executeSql(
+ "CREATE TABLE mydb.T1 (\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ")");
+ tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 10), (2, 20), (3,
30)").await();
+ tEnv.executeSql("CALL sys.init_file_based_privilege('root-passwd')");
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG anonymouscat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s'\n"
+ + ")",
+ path));
+ tEnv.executeSql("USE CATALOG anonymouscat");
+ assertNoPrivilege(
+ () -> tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 11), (2,
21)").await());
+ assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY
k"));
+ assertNoPrivilege(() -> tEnv.executeSql("CREATE TABLE mydb.S1 ( a INT,
b INT )"));
+ assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE mydb.T1"));
+ assertNoPrivilege(() -> tEnv.executeSql("ALTER TABLE mydb.T1 RENAME TO
mydb.T2"));
+ assertNoPrivilege(() -> tEnv.executeSql("CREATE DATABASE anotherdb"));
+ assertNoPrivilege(() -> tEnv.executeSql("DROP DATABASE mydb CASCADE"));
+ assertNoPrivilege(
+ () -> tEnv.executeSql("CALL
sys.create_privileged_user('test2', 'test2-passwd')"));
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG rootcat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s',\n"
+ + " 'user' = 'root',\n"
+ + " 'password' = 'root-passwd'\n"
+ + ")",
+ path));
+ tEnv.executeSql("USE CATALOG rootcat");
+ tEnv.executeSql(
+ "CREATE TABLE mydb2.T2 (\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ")");
+ tEnv.executeSql("INSERT INTO mydb2.T2 VALUES (100, 1000), (200, 2000),
(300, 3000)")
+ .await();
+ tEnv.executeSql("CALL sys.create_privileged_user('test',
'test-passwd')");
+ tEnv.executeSql("CALL sys.grant_privilege_to_user('test',
'CREATE_TABLE', 'mydb')");
+ tEnv.executeSql("CALL sys.grant_privilege_to_user('test', 'SELECT',
'mydb')");
+ tEnv.executeSql("CALL sys.grant_privilege_to_user('test', 'INSERT',
'mydb')");
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG testcat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s',\n"
+ + " 'user' = 'test',\n"
+ + " 'password' = 'test-passwd'\n"
+ + ")",
+ path));
+ tEnv.executeSql("USE CATALOG testcat");
+ tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 12), (2, 22)").await();
+ assertThat(collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY k"))
+ .isEqualTo(Arrays.asList(Row.of(1, 12), Row.of(2, 22),
Row.of(3, 30)));
+ tEnv.executeSql("CREATE TABLE mydb.S1 ( a INT, b INT )");
+ tEnv.executeSql("INSERT INTO mydb.S1 VALUES (1, 100), (2, 200), (3,
300)").await();
+ assertThat(collect(tEnv, "SELECT * FROM mydb.S1 ORDER BY a"))
+ .isEqualTo(Arrays.asList(Row.of(1, 100), Row.of(2, 200),
Row.of(3, 300)));
+ assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE mydb.T1"));
+ assertNoPrivilege(() -> tEnv.executeSql("ALTER TABLE mydb.T1 RENAME TO
mydb.T2"));
+ assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE mydb.S1"));
+ assertNoPrivilege(() -> tEnv.executeSql("ALTER TABLE mydb.S1 RENAME TO
mydb.S2"));
+ assertNoPrivilege(() -> tEnv.executeSql("CREATE DATABASE anotherdb"));
+ assertNoPrivilege(() -> tEnv.executeSql("DROP DATABASE mydb CASCADE"));
+ assertNoPrivilege(
+ () -> tEnv.executeSql("CALL
sys.create_privileged_user('test2', 'test2-passwd')"));
+
+ tEnv.executeSql("USE CATALOG rootcat");
+ tEnv.executeSql("CALL sys.create_privileged_user('test2',
'test2-passwd')");
+ tEnv.executeSql("CALL sys.grant_privilege_to_user('test2', 'SELECT',
'mydb2')");
+ tEnv.executeSql("CALL sys.grant_privilege_to_user('test2', 'INSERT',
'mydb', 'T1')");
+ tEnv.executeSql("CALL sys.grant_privilege_to_user('test2', 'SELECT',
'mydb', 'S1')");
+ tEnv.executeSql("CALL sys.grant_privilege_to_user('test2',
'CREATE_DATABASE')");
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG test2cat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s',\n"
+ + " 'user' = 'test2',\n"
+ + " 'password' = 'test2-passwd'\n"
+ + ")",
+ path));
+ tEnv.executeSql("USE CATALOG test2cat");
+ tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 13), (2, 23)").await();
+ assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY
k"));
+ assertNoPrivilege(() -> tEnv.executeSql("CREATE TABLE mydb.S2 ( a INT,
b INT )"));
+ assertNoPrivilege(
+ () ->
+ tEnv.executeSql("INSERT INTO mydb.S1 VALUES (1, 100),
(2, 200), (3, 300)")
+ .await());
+ assertThat(collect(tEnv, "SELECT * FROM mydb.S1 ORDER BY a"))
+ .isEqualTo(Arrays.asList(Row.of(1, 100), Row.of(2, 200),
Row.of(3, 300)));
+ assertNoPrivilege(
+ () ->
+ tEnv.executeSql(
+ "INSERT INTO mydb2.T2 VALUES (100,
1001), (200, 2001), (300, 3001)")
+ .await());
+ assertThat(collect(tEnv, "SELECT * FROM mydb2.T2 ORDER BY k"))
+ .isEqualTo(Arrays.asList(Row.of(100, 1000), Row.of(200, 2000),
Row.of(300, 3000)));
+ tEnv.executeSql("CREATE DATABASE anotherdb");
+ assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE mydb.T1"));
+ assertNoPrivilege(() -> tEnv.executeSql("ALTER TABLE mydb.T1 RENAME TO
mydb.T2"));
+ assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE mydb.S1"));
+ assertNoPrivilege(() -> tEnv.executeSql("ALTER TABLE mydb.S1 RENAME TO
mydb.S2"));
+ assertNoPrivilege(() -> tEnv.executeSql("DROP DATABASE mydb CASCADE"));
+ assertNoPrivilege(
+ () -> tEnv.executeSql("CALL
sys.create_privileged_user('test3', 'test3-passwd')"));
+
+ tEnv.executeSql("USE CATALOG rootcat");
+ assertThat(collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY k"))
+ .isEqualTo(Arrays.asList(Row.of(1, 13), Row.of(2, 23),
Row.of(3, 30)));
+ tEnv.executeSql("CALL sys.revoke_privilege_from_user('test2',
'SELECT')");
+ tEnv.executeSql("CALL sys.drop_privileged_user('test')");
+
+ tEnv.executeSql("USE CATALOG testcat");
+ Exception e =
+ assertThrows(
+ Exception.class, () -> collect(tEnv, "SELECT * FROM
mydb.T1 ORDER BY k"));
+ assertThat(e).hasRootCauseMessage("User test not found, or password
incorrect.");
+
+ tEnv.executeSql("USE CATALOG test2cat");
+ assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb.S1 ORDER BY
a"));
+ assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb2.T2 ORDER BY
k"));
+ tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 14), (2, 24)").await();
+
+ tEnv.executeSql("USE CATALOG rootcat");
+ assertThat(collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY k"))
+ .isEqualTo(Arrays.asList(Row.of(1, 14), Row.of(2, 24),
Row.of(3, 30)));
+ tEnv.executeSql("DROP DATABASE mydb CASCADE");
+ tEnv.executeSql("DROP DATABASE mydb2 CASCADE");
+ }
+
+ @Test
+ public void testDropUser() throws Exception {
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ initializeSingleUserTest(tEnv);
+
+ tEnv.executeSql("USE CATALOG rootcat");
+ tEnv.executeSql("CALL sys.drop_privileged_user('test')");
+ tEnv.executeSql("CALL sys.create_privileged_user('test',
'test-passwd')");
+
+ tEnv.executeSql("USE CATALOG testcat");
+ assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY
k"));
+ assertNoPrivilege(
+ () -> tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 12), (2,
22)").await());
+ }
+
+ @Test
+ public void testDropObject() throws Exception {
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ initializeSingleUserTest(tEnv);
+
+ tEnv.executeSql("USE CATALOG rootcat");
+ tEnv.executeSql("DROP TABLE mydb.T1");
+ tEnv.executeSql(
+ "CREATE TABLE mydb.T1 (\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ")");
+
+ tEnv.executeSql("USE CATALOG testcat");
+ assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY
k"));
+ assertNoPrivilege(
+ () -> tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 12), (2,
22)").await());
+ }
+
+ @Test
+ public void testRenameObject() throws Exception {
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ initializeSingleUserTest(tEnv);
+
+ tEnv.executeSql("USE CATALOG rootcat");
+ tEnv.executeSql("ALTER TABLE mydb.T1 RENAME TO mydb.T2");
+
+ tEnv.executeSql("USE CATALOG testcat");
+ assertThat(collect(tEnv, "SELECT * FROM mydb.T2 ORDER BY k"))
+ .isEqualTo(Arrays.asList(Row.of(1, 11), Row.of(2, 21),
Row.of(3, 30)));
+ tEnv.executeSql("INSERT INTO mydb.T2 VALUES (1, 12), (2, 22)").await();
+ assertThat(collect(tEnv, "SELECT * FROM mydb.T2 ORDER BY k"))
+ .isEqualTo(Arrays.asList(Row.of(1, 12), Row.of(2, 22),
Row.of(3, 30)));
+ }
+
+ private void initializeSingleUserTest(TableEnvironment tEnv) throws
Exception {
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s'\n"
+ + ")",
+ path));
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql("CREATE DATABASE mydb");
+ tEnv.executeSql(
+ "CREATE TABLE mydb.T1 (\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ")");
+ tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 10), (2, 20), (3,
30)").await();
+ tEnv.executeSql("CALL sys.init_file_based_privilege('root-passwd')");
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG rootcat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s',\n"
+ + " 'user' = 'root',\n"
+ + " 'password' = 'root-passwd'\n"
+ + ")",
+ path));
+ tEnv.executeSql("USE CATALOG rootcat");
+ tEnv.executeSql("CALL sys.create_privileged_user('test',
'test-passwd')");
+ tEnv.executeSql("CALL sys.grant_privilege_to_user('test', 'SELECT',
'mydb', 'T1')");
+ tEnv.executeSql("CALL sys.grant_privilege_to_user('test', 'INSERT',
'mydb', 'T1')");
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG testcat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s',\n"
+ + " 'user' = 'test',\n"
+ + " 'password' = 'test-passwd'\n"
+ + ")",
+ path));
+ tEnv.executeSql("USE CATALOG testcat");
+ assertThat(collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY k"))
+ .isEqualTo(Arrays.asList(Row.of(1, 10), Row.of(2, 20),
Row.of(3, 30)));
+ tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 11), (2, 21)").await();
+ assertThat(collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY k"))
+ .isEqualTo(Arrays.asList(Row.of(1, 11), Row.of(2, 21),
Row.of(3, 30)));
+ }
+
+ private List<Row> collect(TableEnvironment tEnv, String sql) throws
Exception {
+ List<Row> result = new ArrayList<>();
+ try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+ while (it.hasNext()) {
+ result.add(it.next());
+ }
+ }
+ return result;
+ }
+
+ private void assertNoPrivilege(Executable executable) {
+ Exception e = assertThrows(Exception.class, executable);
+ if (e.getCause() != null) {
+ assertThat(e).hasRootCauseInstanceOf(NoPrivilegeException.class);
+ } else {
+ assertThat(e).isInstanceOf(NoPrivilegeException.class);
+ }
+ }
+}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 32d25e7db..06dbc5a2b 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -33,6 +33,9 @@ import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;
+import org.apache.paimon.privilege.FileBasedPrivilegeManager;
+import org.apache.paimon.privilege.PrivilegeManager;
+import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -684,12 +687,26 @@ public class HiveCatalog extends AbstractCatalog {
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- return new HiveCatalog(
- fileIO,
- hiveConf,
- options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
- options,
- warehouse.toUri().toString());
+
+ Catalog catalog =
+ new HiveCatalog(
+ fileIO,
+ hiveConf,
+ options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
+ options,
+ warehouse.toUri().toString());
+
+ PrivilegeManager privilegeManager =
+ new FileBasedPrivilegeManager(
+ warehouse.toString(),
+ fileIO,
+ context.options().get(PrivilegedCatalog.USER),
+ context.options().get(PrivilegedCatalog.PASSWORD));
+ if (privilegeManager.privilegeEnabled()) {
+ catalog = new PrivilegedCatalog(catalog, privilegeManager);
+ }
+
+ return catalog;
}
public static HiveConf createHiveConf(CatalogContext context) {
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
index cc51df265..95da00371 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
@@ -24,16 +24,11 @@ import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
/** Factory to create {@link HiveCatalog}. */
public class HiveCatalogFactory implements CatalogFactory {
- private static final Logger LOG =
LoggerFactory.getLogger(HiveCatalogFactory.class);
-
public static final ConfigOption<String> METASTORE_CLIENT_CLASS =
ConfigOptions.key("metastore.client.class")
.stringType()
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 4d9753bab..a5c494c1e 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.hive.annotation.Minio;
import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
+import org.apache.paimon.privilege.NoPrivilegeException;
import org.apache.paimon.s3.MinioTestContainer;
import com.klarna.hiverunner.HiveShell;
@@ -43,6 +44,7 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.jupiter.api.function.Executable;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
@@ -67,6 +69,7 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/** IT cases for using Paimon {@link HiveCatalog} together with Paimon Hive
connector. */
@RunWith(PaimonEmbeddedHiveRunner.class)
@@ -76,6 +79,7 @@ public abstract class HiveCatalogITCaseBase {
protected String path;
protected TableEnvironment tEnv;
+ private boolean locationInProperties;
@HiveSQL(files = {})
protected static HiveShell hiveShell;
@@ -83,26 +87,41 @@ public abstract class HiveCatalogITCaseBase {
@Minio private static MinioTestContainer minioTestContainer;
private void before(boolean locationInProperties) throws Exception {
- Map<String, String> catalogProperties = new HashMap<>();
+ this.locationInProperties = locationInProperties;
+ if (locationInProperties) {
+ path = minioTestContainer.getS3UriForDefaultBucket() + "/" +
UUID.randomUUID();
+ } else {
+ path = folder.newFolder().toURI().toString();
+ }
+ registerHiveCatalog("my_hive", new HashMap<>());
+
+ tEnv.executeSql("USE CATALOG my_hive").await();
+ tEnv.executeSql("DROP DATABASE IF EXISTS test_db CASCADE");
+ tEnv.executeSql("CREATE DATABASE test_db").await();
+ tEnv.executeSql("USE test_db").await();
+ hiveShell.execute("USE test_db");
+ hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )");
+ hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200,
'Table')");
+ }
+
+ private void registerHiveCatalog(String catalogName, Map<String, String>
catalogProperties)
+ throws Exception {
catalogProperties.put("type", "paimon");
catalogProperties.put("metastore", "hive");
catalogProperties.put("uri", "");
catalogProperties.put("lock.enabled", "true");
catalogProperties.put("location-in-properties",
String.valueOf(locationInProperties));
+ catalogProperties.put("warehouse", path);
if (locationInProperties) {
- path = minioTestContainer.getS3UriForDefaultBucket() + "/" +
UUID.randomUUID();
catalogProperties.putAll(minioTestContainer.getS3ConfigOptions());
- } else {
- path = folder.newFolder().toURI().toString();
}
- catalogProperties.put("warehouse", path);
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
tEnv = TableEnvironmentImpl.create(settings);
tEnv.executeSql(
String.join(
"\n",
- "CREATE CATALOG my_hive WITH (",
+ "CREATE CATALOG " + catalogName + " WITH (",
catalogProperties.entrySet().stream()
.map(
e ->
@@ -112,13 +131,6 @@ public abstract class HiveCatalogITCaseBase {
.collect(Collectors.joining(",\n")),
")"))
.await();
- tEnv.executeSql("USE CATALOG my_hive").await();
- tEnv.executeSql("DROP DATABASE IF EXISTS test_db CASCADE");
- tEnv.executeSql("CREATE DATABASE test_db").await();
- tEnv.executeSql("USE test_db").await();
- hiveShell.execute("USE test_db");
- hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )");
- hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200,
'Table')");
}
private void after() {
@@ -1034,6 +1046,41 @@ public abstract class HiveCatalogITCaseBase {
}
}
+ @Test
+ public void testFileBasedPrivilege() throws Exception {
+ tEnv.executeSql("CREATE TABLE t ( a INT, b INT )");
+ tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
+ tEnv.executeSql("CALL sys.init_file_based_privilege('root-passwd')");
+
+ Map<String, String> rootCatalogProperties = new HashMap<>();
+ rootCatalogProperties.put("user", "root");
+ rootCatalogProperties.put("password", "root-passwd");
+ registerHiveCatalog("my_hive_root", rootCatalogProperties);
+ tEnv.executeSql("USE CATALOG my_hive_root");
+ tEnv.executeSql("CALL sys.create_privileged_user('test',
'test-passwd')");
+ tEnv.executeSql("CALL sys.grant_privilege_to_user('test', 'SELECT',
'test_db')");
+
+ Map<String, String> testCatalogProperties = new HashMap<>();
+ testCatalogProperties.put("user", "test");
+ testCatalogProperties.put("password", "test-passwd");
+ registerHiveCatalog("my_hive_test", testCatalogProperties);
+ tEnv.executeSql("USE CATALOG my_hive_test");
+ tEnv.executeSql("USE test_db");
+ assertThat(collect("SELECT * FROM t ORDER BY a"))
+ .containsExactly(Row.of(1, 10), Row.of(2, 20));
+ assertNoPrivilege(() -> tEnv.executeSql("INSERT INTO t VALUES (3,
30)").await());
+ assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE t").await());
+ }
+
+ private void assertNoPrivilege(Executable executable) {
+ Exception e = assertThrows(Exception.class, executable);
+ if (e.getCause() != null) {
+ assertThat(e).hasRootCauseInstanceOf(NoPrivilegeException.class);
+ } else {
+ assertThat(e).isInstanceOf(NoPrivilegeException.class);
+ }
+ }
+
protected List<Row> collect(String sql) throws Exception {
List<Row> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {