This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bc26bb927 [core] Introduce privilege system for catalog based on 
FileSystem (#2789)
bc26bb927 is described below

commit bc26bb92775a386add1cd9b202085f9aff66f320
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 29 20:35:09 2024 +0800

    [core] Introduce privilege system for catalog based on FileSystem (#2789)
---
 docs/content/maintenance/manage-privileges.md      | 246 +++++++++++
 .../java/org/apache/paimon/AbstractFileStore.java  |   4 +-
 .../org/apache/paimon/catalog/AbstractCatalog.java |  25 +-
 .../java/org/apache/paimon/catalog/Catalog.java    |   9 +
 .../paimon/catalog/FileSystemCatalogFactory.java   |  18 +-
 .../privilege/AllGrantedPrivilegeChecker.java      |  66 +++
 .../org/apache/paimon/privilege/EntityType.java    |  35 ++
 .../privilege/FileBasedPrivilegeManager.java       | 457 +++++++++++++++++++++
 .../paimon/privilege/NoPrivilegeException.java     |  39 ++
 .../apache/paimon/privilege/PrivilegeChecker.java  |  49 +++
 .../paimon/privilege/PrivilegeCheckerImpl.java     | 154 +++++++
 .../apache/paimon/privilege/PrivilegeManager.java  |  89 ++++
 .../org/apache/paimon/privilege/PrivilegeType.java |  65 +++
 .../apache/paimon/privilege/PrivilegedCatalog.java | 256 ++++++++++++
 .../paimon/privilege/PrivilegedFileStore.java      | 200 +++++++++
 .../paimon/privilege/PrivilegedFileStoreTable.java | 308 ++++++++++++++
 .../privilege/FileBasedPrivilegeManagerTest.java   | 170 ++++++++
 .../org/apache/paimon/stats/StatsTableTest.java    |   4 +-
 .../paimon/table/FileStoreTableTestBase.java       |   4 +-
 .../paimon/flink/procedure/CompactProcedure.java   |   5 +-
 .../flink/procedure/CompactDatabaseProcedure.java  |   5 +-
 .../paimon/flink/procedure/CompactProcedure.java   |   5 +-
 .../paimon/flink/procedure/MergeIntoProcedure.java |   5 +-
 .../privilege/CreatePrivilegedUserProcedure.java   |  33 +-
 .../privilege/DropPrivilegedUserProcedure.java     |  33 +-
 .../privilege/GrantPrivilegeToUserProcedure.java   |  79 ++++
 .../privilege/InitFileBasedPrivilegeProcedure.java |  74 ++++
 .../privilege/PrivilegeProcedureBase.java          |  35 ++
 .../RevokePrivilegeFromUserProcedure.java          |  87 ++++
 .../services/org.apache.paimon.factories.Factory   |   7 +-
 .../privilege/PrivilegeProcedureITCase.java        | 323 +++++++++++++++
 .../java/org/apache/paimon/hive/HiveCatalog.java   |  29 +-
 .../org/apache/paimon/hive/HiveCatalogFactory.java |   5 -
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  |  73 +++-
 34 files changed, 2903 insertions(+), 93 deletions(-)

diff --git a/docs/content/maintenance/manage-privileges.md 
b/docs/content/maintenance/manage-privileges.md
new file mode 100644
index 000000000..bf07e14b7
--- /dev/null
+++ b/docs/content/maintenance/manage-privileges.md
@@ -0,0 +1,246 @@
+---
+title: "Manage Privileges"
+weight: 10
+type: docs
+aliases:
+- /maintenance/manage-privileges.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Manage Privileges
+
+Paimon provides a privilege system on catalogs.
+Privileges determine which users can perform which operations on which objects,
+so that you can manage table access in a fine-grained manner.
+
+Currently, Paimon adopts the identity-based access control (IBAC) privilege 
model.
+That is, privileges are directly assigned to users.
+
+{{< hint warning >}}
+This privilege system only prevents unwanted users from accessing tables 
through catalogs.
+It does not block access through temporary table (by specifying table path on 
filesystem),
+nor does it prevent user from directly modifying data files on filesystem.
+If you need more serious protection, use a filesystem with access management 
instead.
+{{< /hint >}}
+
+## Basic Concepts
+
+We now introduce the basic concepts of the privilege system.
+
+### Object
+
+An object is an entity to which access can be granted. Unless allowed by a 
grant, access is denied.
+
+Currently, the privilege system in Paimon has three types of objects: CATALOG, 
DATABASE and TABLE.
+Objects have a logical hierarchy, which is related to the concept they 
represent.
+For example:
+* If a user is granted a privilege on the catalog,
+he will also have this privilege on all databases and all tables in the 
catalog.
+* If a user is granted a privilege on the database,
+he will also have this privilege on all tables in that database.
+* If a user is revoked a privilege from the catalog,
+he will also lose this privilege on all databases and all tables in the 
catalog.
+* If a user is revoked a privilege from the database, 
+he will also lose this privilege on all tables in that database.
+
+### Privilege
+
+A privilege is a defined level of access to an object.
+Multiple privileges can be used to control the granularity of access granted 
on an object.
+Privileges are object-specific. Different objects may have different 
privileges.
+
+Currently, we support the following privileges.
+
+| Privilege   | Description                                                    
                           | Can be Granted on        |
+|-------------|-------------------------------------------------------------------------------------------|--------------------------|
+| SELECT      | Queries data in a table.                                       
                           | TABLE, DATABASE, CATALOG |
+| INSERT      | Inserts, updates or drops data in a table. Creates or drops 
tags and branches in a table. | TABLE, DATABASE, CATALOG |
+| ALTER_TABLE | Alters metadata of a table, including table name, column 
names, table options, etc.       | TABLE, DATABASE, CATALOG |
+| DROP_TABLE | Drops a table. | TABLE, DATABASE, CATALOG |
+| CREATE_TABLE | Creates a table in a database. | DATABASE, CATALOG |
+| DROP_DATABASE | Drops a database. | DATABASE, CATALOG |
+| CREATE_DATABASE | Creates a database in the catalog. | CATALOG |
+| ADMIN | Creates or drops privileged users, grants or revokes privileges from 
users in a catalog. | CATALOG |
+
+### User
+
+The entity to which privileges can be granted. Users are authenticated by 
their password.
+
+When the privilege system is enabled, two special users will be created 
automatically.
+
+* The `root` user,
+which is identified by the provided root password when enabling the privilege 
system.
+This user always has all privileges in the catalog.
+* The `anonymous` user.
+This is the default user if no username and password is provided when creating 
the catalog.
+
+## Enable Privileges
+
+Paimon currently only supports file-based privilege system.
+Only catalogs with `'metastore' = 'filesystem'` (the default value) or 
`'metastore' = 'hive'` support such privilege system.
+
+To enable the privilege system on a filesystem / Hive catalog, do the 
following steps.
+
+{{< tabs "enable-privileges" >}}
+
+{{< tab "Flink 1.18+" >}}
+Run the following Flink SQL.
+```sql
+-- use the catalog where you want to enable the privilege system
+USE CATALOG `my-catalog`;
+    
+-- initialize privilege system by providing a root password
+-- change 'root-password' to the password you want
+CALL sys.init_file_based_privilege('root-password');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+After the privilege system is enabled,
+please re-create the catalog and authenticate as `root` to create other users 
and grant them privileges.
+
+{{< hint info >}}
+Privilege system does not affect existing catalogs.
+That is, these catalogs can still access and modify the tables freely.
+Please drop and re-create all catalogs with the desired warehouse path
+if you want to use the privilege system in these catalogs.
+{{< /hint >}}
+
+## Accessing Privileged Catalogs
+
+To access a privileged catalog and to be authenticated as a user,
+you need to define `user` and `password` catalog options when creating the 
catalog.
+For example, the following SQL creates a catalog while trying to be 
authenticated as `root`,
+whose password is `mypassword`.
+
+{{< tabs "access-catalog" >}}
+
+{{< tab "Flink" >}}
+```sql
+CREATE CATALOG `my-catalog` WITH (
+    'type' = 'paimon',
+    -- ...
+    'user' = 'root',
+    'password' = 'mypassword'
+);
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Creating Users
+
+You must be authenticated as a user with `ADMIN` privilege (for example, 
`root`) to perform this operation.
+
+Do the following steps to create a user in the privilege system.
+
+{{< tabs "create-users" >}}
+
+{{< tab "Flink 1.18+" >}}
+Run the following Flink SQL.
+```sql
+-- use the catalog where you want to create a user
+-- you must be authenticated as a user with ADMIN privilege in this catalog
+USE CATALOG `my-catalog`;
+
+-- create a user authenticated by the specified password
+-- change 'user' and 'password' to the username and password you want
+CALL sys.create_privileged_user('user', 'password');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Dropping Users
+
+You must be authenticated as a user with `ADMIN` privilege (for example, 
`root`) to perform this operation.
+
+Do the following steps to drop a user in the privilege system.
+
+{{< tabs "drop-users" >}}
+
+{{< tab "Flink 1.18+" >}}
+Run the following Flink SQL.
+```sql
+-- use the catalog where you want to drop a user
+-- you must be authenticated as a user with ADMIN privilege in this catalog
+USE CATALOG `my-catalog`;
+
+-- change 'user' to the username you want to drop
+CALL sys.drop_privileged_user('user');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Granting Privileges to Users
+
+You must be authenticated as a user with `ADMIN` privilege (for example, 
`root`) to perform this operation.
+
+Do the following steps to grant a user with privilege in the privilege system.
+
+{{< tabs "grant-to-users" >}}
+
+{{< tab "Flink 1.18+" >}}
+Run the following Flink SQL.
+```sql
+-- use the catalog where you want to drop a user
+-- you must be authenticated as a user with ADMIN privilege in this catalog
+USE CATALOG `my-catalog`;
+
+-- you can change 'user' to the username you want, and 'SELECT' to other 
privilege you want
+-- grant 'user' with privilege 'SELECT' on the whole catalog
+CALL sys.grant_privilege_to_user('user', 'SELECT');
+-- grant 'user' with privilege 'SELECT' on database my_db
+CALL sys.grant_privilege_to_user('user', 'SELECT', 'my_db');
+-- grant 'user' with privilege 'SELECT' on table my_db.my_tbl
+CALL sys.grant_privilege_to_user('user', 'SELECT', 'my_db', 'my_tbl');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Revoking Privileges to Users
+
+You must be authenticated as a user with `ADMIN` privilege (for example, 
`root`) to perform this operation.
+
+Do the following steps to revoke a privilege from user in the privilege system.
+
+{{< tabs "revoke-from-users" >}}
+
+{{< tab "Flink 1.18+" >}}
+Run the following Flink SQL.
+```sql
+-- use the catalog where you want to drop a user
+-- you must be authenticated as a user with ADMIN privilege in this catalog
+USE CATALOG `my-catalog`;
+
+-- you can change 'user' to the username you want, and 'SELECT' to other 
privilege you want
+-- revoke 'user' with privilege 'SELECT' on the whole catalog
+CALL sys.revoke_privilege_from_user('user', 'SELECT');
+-- revoke 'user' with privilege 'SELECT' on database my_db
+CALL sys.revoke_privilege_from_user('user', 'SELECT', 'my_db');
+-- revoke 'user' with privilege 'SELECT' on table my_db.my_tbl
+CALL sys.revoke_privilege_from_user('user', 'SELECT', 'my_db', 'my_tbl');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 07a73d3de..e893c0525 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -62,7 +62,7 @@ import static 
org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
  *
  * @param <T> type of record to read and write.
  */
-public abstract class AbstractFileStore<T> implements FileStore<T> {
+abstract class AbstractFileStore<T> implements FileStore<T> {
 
     protected final FileIO fileIO;
     protected final SchemaManager schemaManager;
@@ -73,7 +73,7 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
 
     @Nullable private final SegmentsCache<String> writeManifestCache;
 
-    public AbstractFileStore(
+    protected AbstractFileStore(
             FileIO fileIO,
             SchemaManager schemaManager,
             TableSchema schema,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 4a4fb04fd..7d26a2197 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -85,6 +85,16 @@ public abstract class AbstractCatalog implements Catalog {
         this.catalogOptions = options;
     }
 
+    @Override
+    public Map<String, String> options() {
+        return catalogOptions.toMap();
+    }
+
+    @Override
+    public FileIO fileIO() {
+        return fileIO;
+    }
+
     @Override
     public Optional<CatalogLockFactory> lockFactory() {
         if (!lockEnabled()) {
@@ -369,17 +379,6 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    /**
-     * Get the warehouse path for the catalog if exists.
-     *
-     * @return The catalog warehouse path.
-     */
-    public abstract String warehouse();
-
-    public Map<String, String> options() {
-        return catalogOptions.toMap();
-    }
-
     protected abstract TableSchema getDataTableSchema(Identifier identifier)
             throws TableNotExistException;
 
@@ -409,10 +408,6 @@ public abstract class AbstractCatalog implements Catalog {
         tableDefaultOptions.forEach(options::putIfAbsent);
     }
 
-    public FileIO fileIO() {
-        return fileIO;
-    }
-
     private String[] tableAndSystemName(Identifier identifier) {
         String[] splits = StringUtils.split(identifier.getObjectName(), 
SYSTEM_TABLE_SPLITTER);
         if (splits.length != 2) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 99b71e8de..4f6341417 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -45,6 +46,14 @@ public interface Catalog extends AutoCloseable {
     String SYSTEM_TABLE_SPLITTER = "$";
     String SYSTEM_DATABASE_NAME = "sys";
 
+    /** Warehouse root path containing all database directories in this 
catalog. */
+    String warehouse();
+
+    /** Catalog options. */
+    Map<String, String> options();
+
+    FileIO fileIO();
+
     /**
      * Get lock factory from catalog. Lock is used to support multiple 
concurrent writes on the
      * object store.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
index 8f4b6eede..8a0b1643f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
@@ -20,6 +20,9 @@ package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.privilege.FileBasedPrivilegeManager;
+import org.apache.paimon.privilege.PrivilegeManager;
+import org.apache.paimon.privilege.PrivilegedCatalog;
 import org.apache.paimon.table.TableType;
 
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
@@ -40,6 +43,19 @@ public class FileSystemCatalogFactory implements 
CatalogFactory {
             throw new IllegalArgumentException(
                     "Only managed table is supported in File system catalog.");
         }
-        return new FileSystemCatalog(fileIO, warehouse, context.options());
+
+        Catalog catalog = new FileSystemCatalog(fileIO, warehouse, 
context.options());
+
+        PrivilegeManager privilegeManager =
+                new FileBasedPrivilegeManager(
+                        warehouse.toString(),
+                        fileIO,
+                        context.options().get(PrivilegedCatalog.USER),
+                        context.options().get(PrivilegedCatalog.PASSWORD));
+        if (privilegeManager.privilegeEnabled()) {
+            catalog = new PrivilegedCatalog(catalog, privilegeManager);
+        }
+
+        return catalog;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/AllGrantedPrivilegeChecker.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/AllGrantedPrivilegeChecker.java
new file mode 100644
index 000000000..09944681a
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/AllGrantedPrivilegeChecker.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+
+/** Allows current user to perform all operations. */
+public class AllGrantedPrivilegeChecker implements PrivilegeChecker {
+
+    @Override
+    public void assertCanSelect(Identifier identifier) {}
+
+    @Override
+    public void assertCanInsert(Identifier identifier) {}
+
+    @Override
+    public void assertCanAlterTable(Identifier identifier) {}
+
+    @Override
+    public void assertCanDropTable(Identifier identifier) {}
+
+    @Override
+    public void assertCanCreateTable(String databaseName) {}
+
+    @Override
+    public void assertCanDropDatabase(String databaseName) {}
+
+    @Override
+    public void assertCanCreateDatabase() {}
+
+    @Override
+    public void assertCanCreateUser() {}
+
+    @Override
+    public void assertCanDropUser() {}
+
+    @Override
+    public void assertCanGrant(String identifier, PrivilegeType privilege) {}
+
+    @Override
+    public void assertCanRevoke() {}
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        return o != null && getClass() == o.getClass();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/EntityType.java 
b/paimon-core/src/main/java/org/apache/paimon/privilege/EntityType.java
new file mode 100644
index 000000000..134de1715
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/privilege/EntityType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.annotation.Public;
+
+/**
+ * Entity type in privilege system.
+ *
+ * <p>Supports identity-based access control (grant privilege directly to 
user) and role-based
+ * access control (grant privilege to role, then assign role to user).
+ *
+ * @since 0.8.0
+ */
+@Public
+public enum EntityType {
+    USER,
+    ROLE
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
new file mode 100644
index 000000000..ec86e0cd1
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A {@link PrivilegeManager} based on user and privilege system tables. The 
directories of these
+ * system tables are created at the root of warehouse (table-root/user.sys and
+ * table-root/privilege.sys).
+ *
+ * <p>User table is the table which stores all user information. The schema of 
user table is:
+ *
+ * <ul>
+ *   <li>user (string): user name (primary key)
+ *   <li>sha256 (bytes): sha256 of password
+ * </ul>
+ *
+ * <p>Privilege table is the table storing what privileges each user have. Its 
schema is:
+ *
+ * <ul>
+ *   <li>name (string): user or role name (primary key)
+ *   <li>entity_type (string): user or role (primary key)
+ *   <li>identifier (string): identifier of object (primary key)
+ *   <li>privilege (string): name of privilege (primary key), see {@link 
PrivilegeType}
+ * </ul>
+ */
+public class FileBasedPrivilegeManager implements PrivilegeManager {
+
+    private static final String USER_TABLE_DIR = "user.sys";
+    private static final RowType USER_TABLE_TYPE =
+            RowType.of(
+                    new DataType[] {DataTypes.STRING(), DataTypes.BYTES()},
+                    new String[] {"user", "sha256"});
+
+    private static final String PRIVILEGE_TABLE_DIR = "privilege.sys";
+    private static final RowType PRIVILEGE_TABLE_TYPE =
+            RowType.of(
+                    new DataType[] {
+                        DataTypes.STRING(),
+                        DataTypes.STRING(),
+                        DataTypes.STRING(),
+                        DataTypes.STRING()
+                    },
+                    new String[] {"name", "entity_type", "identifier", 
"privilege"});
+
+    private final String warehouse;
+    private final FileIO fileIO;
+    private final String user;
+    private final byte[] sha256;
+
+    private Table userTable;
+    private Table privilegeTable;
+
+    public FileBasedPrivilegeManager(
+            String warehouse, FileIO fileIO, String user, String password) {
+        this.warehouse = warehouse;
+        this.fileIO = fileIO;
+        this.user = user;
+        this.sha256 = getSha256(password);
+    }
+
+    @Override
+    public boolean privilegeEnabled() {
+        return getUserTable(false) != null && getPrivilegeTable(false) != null;
+    }
+
+    @Override
+    public void initializePrivilege(String rootPassword) {
+        if (privilegeEnabled()) {
+            throw new IllegalStateException(
+                    "Privilege system is already enabled in warehouse " + 
warehouse);
+        }
+
+        createUserTable();
+        createUserImpl(USER_ROOT, rootPassword);
+        createUserImpl(USER_ANONYMOUS, PASSWORD_ANONYMOUS);
+
+        createPrivilegeTable();
+    }
+
+    @Override
+    public void createUser(String user, String password) {
+        getPrivilegeChecker().assertCanCreateUser();
+        if (userExists(user)) {
+            throw new IllegalArgumentException("User " + user + " already 
exists");
+        }
+        createUserImpl(user, password);
+    }
+
+    @Override
+    public void dropUser(String user) {
+        getPrivilegeChecker().assertCanDropUser();
+        Preconditions.checkArgument(!USER_ROOT.equals(user), USER_ROOT + " 
cannot be dropped");
+        Preconditions.checkArgument(
+                !USER_ANONYMOUS.equals(user), USER_ANONYMOUS + " cannot be 
dropped");
+        dropUserImpl(user);
+    }
+
+    @Override
+    public void grant(String user, String identifier, PrivilegeType privilege) 
{
+        getPrivilegeChecker().assertCanGrant(identifier, privilege);
+        Preconditions.checkArgument(
+                !USER_ROOT.equals(user), "Cannot change privilege for user " + 
USER_ROOT);
+        if (!userExists(user)) {
+            throw new IllegalArgumentException("User " + user + " does not 
exist");
+        }
+        grantImpl(
+                Collections.singletonList(
+                        new PrivilegeEntry(user, EntityType.USER, identifier, 
privilege)));
+    }
+
+    @Override
+    public int revoke(String user, String identifier, PrivilegeType privilege) 
{
+        getPrivilegeChecker().assertCanRevoke();
+        Preconditions.checkArgument(
+                !USER_ROOT.equals(user), "Cannot change privilege for user " + 
USER_ROOT);
+        if (!userExists(user)) {
+            throw new IllegalArgumentException("User " + user + " does not 
exist");
+        }
+        int count = revokeImpl(user, identifier, privilege);
+        Preconditions.checkArgument(
+                count > 0,
+                String.format(
+                        "User %s does not have privilege %s on %s. "
+                                + "It's possible that the user has such 
privilege on a higher level. "
+                                + "Please check the privilege table.",
+                        user, privilege, identifier));
+        return count;
+    }
+
+    @Override
+    public void objectRenamed(String oldName, String newName) {
+        Table privilegeTable = getPrivilegeTable(true);
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(PRIVILEGE_TABLE_TYPE);
+        Predicate predicate = predicateBuilder.equal(2, 
BinaryString.fromString(oldName));
+
+        BatchWriteBuilder writeBuilder = privilegeTable.newBatchWriteBuilder();
+        try (CloseableIterator<InternalRow> it = read(privilegeTable, 
predicate);
+                BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            while (it.hasNext()) {
+                InternalRow row = it.next();
+                GenericRow replaced =
+                        GenericRow.of(
+                                row.getString(0),
+                                row.getString(1),
+                                BinaryString.fromString(newName),
+                                row.getString(3));
+                write.write(replaced);
+            }
+            commit.commit(write.prepareCommit());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void objectDropped(String identifier) {
+        Table privilegeTable = getPrivilegeTable(true);
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(PRIVILEGE_TABLE_TYPE);
+        Predicate predicate = predicateBuilder.startsWith(2, 
BinaryString.fromString(identifier));
+        deleteAll(privilegeTable, predicate);
+    }
+
+    @Override
+    public PrivilegeChecker getPrivilegeChecker() {
+        assertUserPassword();
+        if (USER_ROOT.equals(user)) {
+            return new AllGrantedPrivilegeChecker();
+        }
+
+        Table privilegeTable = getPrivilegeTable(true);
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(PRIVILEGE_TABLE_TYPE);
+        Predicate predicate =
+                PredicateBuilder.and(
+                        predicateBuilder.equal(0, 
BinaryString.fromString(user)),
+                        predicateBuilder.equal(1, 
BinaryString.fromString(EntityType.USER.name())));
+
+        Map<String, Set<PrivilegeType>> privileges = new HashMap<>();
+        try (CloseableIterator<InternalRow> it = read(privilegeTable, 
predicate)) {
+            while (it.hasNext()) {
+                InternalRow row = it.next();
+                privileges
+                        .computeIfAbsent(row.getString(2).toString(), ignore 
-> new HashSet<>())
+                        
.add(PrivilegeType.valueOf(row.getString(3).toString()));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return new PrivilegeCheckerImpl(user, privileges);
+    }
+
+    private void createUserImpl(String user, String password) {
+        byte[] sha256 = getSha256(password);
+        BatchWriteBuilder writeBuilder = 
getUserTable(true).newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(BinaryString.fromString(user), sha256));
+            commit.commit(write.prepareCommit());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void dropUserImpl(String user) {
+        BatchWriteBuilder writeBuilder = 
getUserTable(true).newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(
+                    GenericRow.ofKind(RowKind.DELETE, 
BinaryString.fromString(user), new byte[0]));
+            commit.commit(write.prepareCommit());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(PRIVILEGE_TABLE_TYPE);
+        Predicate predicate =
+                PredicateBuilder.and(
+                        predicateBuilder.equal(0, 
BinaryString.fromString(user)),
+                        predicateBuilder.equal(1, 
BinaryString.fromString(EntityType.USER.name())));
+        deleteAll(getPrivilegeTable(true), predicate);
+    }
+
+    private void grantImpl(List<PrivilegeEntry> entries) {
+        BatchWriteBuilder writeBuilder = 
getPrivilegeTable(true).newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            for (PrivilegeEntry entry : entries) {
+                write.write(
+                        GenericRow.of(
+                                BinaryString.fromString(entry.name),
+                                
BinaryString.fromString(entry.entityType.name()),
+                                BinaryString.fromString(entry.identifier),
+                                
BinaryString.fromString(entry.privilege.name())));
+            }
+            commit.commit(write.prepareCommit());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private int revokeImpl(String user, String identifier, PrivilegeType 
privilege) {
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(PRIVILEGE_TABLE_TYPE);
+        Predicate predicate =
+                PredicateBuilder.and(
+                        predicateBuilder.equal(0, 
BinaryString.fromString(user)),
+                        predicateBuilder.equal(1, 
BinaryString.fromString(EntityType.USER.name())),
+                        predicateBuilder.startsWith(2, 
BinaryString.fromString(identifier)),
+                        predicateBuilder.equal(3, 
BinaryString.fromString(privilege.name())));
+        return deleteAll(getPrivilegeTable(true), predicate);
+    }
+
+    private static class PrivilegeEntry {
+        String name;
+        EntityType entityType;
+        String identifier;
+        PrivilegeType privilege;
+
+        private PrivilegeEntry(
+                String name, EntityType entityType, String identifier, 
PrivilegeType privilege) {
+            this.name = name;
+            this.entityType = entityType;
+            this.identifier = identifier;
+            this.privilege = privilege;
+        }
+    }
+
+    private boolean userExists(String user) {
+        Table userTable = getUserTable(true);
+        Predicate predicate =
+                new PredicateBuilder(USER_TABLE_TYPE).equal(0, 
BinaryString.fromString(user));
+        try (CloseableIterator<InternalRow> it = read(userTable, predicate)) {
+            return it.hasNext();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void assertUserPassword() {
+        Table userTable = getUserTable(true);
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(USER_TABLE_TYPE);
+        Predicate predicate =
+                PredicateBuilder.and(
+                        predicateBuilder.equal(0, 
BinaryString.fromString(user)),
+                        predicateBuilder.equal(1, sha256));
+
+        try (CloseableIterator<InternalRow> it = read(userTable, predicate)) {
+            if (it.hasNext()) {
+                return;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        throw new IllegalArgumentException("User " + user + " not found, or 
password incorrect.");
+    }
+
+    private Table getUserTable(boolean assertExists) {
+        userTable = getTable(userTable, USER_TABLE_DIR, assertExists);
+        return userTable;
+    }
+
+    private Table getPrivilegeTable(boolean assertExists) {
+        privilegeTable = getTable(privilegeTable, PRIVILEGE_TABLE_DIR, 
assertExists);
+        return privilegeTable;
+    }
+
+    private Table getTable(Table lazy, String dir, boolean assertExists) {
+        if (lazy != null) {
+            return lazy;
+        }
+
+        Path tableRoot = new Path(warehouse, dir);
+        boolean tableExists;
+        try {
+            tableExists = fileIO.exists(tableRoot);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
+        if (tableExists) {
+            return FileStoreTableFactory.create(fileIO, tableRoot);
+        } else if (assertExists) {
+            throw new RuntimeException(
+                    "Privilege system is not enabled in warehouse " + 
warehouse + ".");
+        } else {
+            return null;
+        }
+    }
+
+    private void createUserTable() {
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, 1);
+        Path tableRoot = new Path(warehouse, USER_TABLE_DIR);
+        SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot);
+        try {
+            schemaManager.createTable(
+                    new Schema(
+                            USER_TABLE_TYPE.getFields(),
+                            Collections.emptyList(),
+                            Collections.singletonList("user"),
+                            options.toMap(),
+                            ""));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void createPrivilegeTable() {
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, 1);
+        Path tableRoot = new Path(warehouse, PRIVILEGE_TABLE_DIR);
+        SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot);
+        try {
+            schemaManager.createTable(
+                    new Schema(
+                            PRIVILEGE_TABLE_TYPE.getFields(),
+                            Collections.emptyList(),
+                            Arrays.asList("name", "entity_type", "privilege", 
"identifier"),
+                            options.toMap(),
+                            ""));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private CloseableIterator<InternalRow> read(Table table, Predicate 
predicate) {
+        ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        try {
+            return new RecordReaderIterator<>(
+                    readBuilder.newRead().executeFilter().createReader(plan));
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private int deleteAll(Table table, Predicate predicate) {
+        int count = 0;
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (CloseableIterator<InternalRow> it = read(table, predicate);
+                BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            while (it.hasNext()) {
+                InternalRow row = it.next();
+                row.setRowKind(RowKind.DELETE);
+                write.write(row);
+                count++;
+            }
+            commit.commit(write.prepareCommit());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return count;
+    }
+
+    private byte[] getSha256(String s) {
+        try {
+            return 
MessageDigest.getInstance("SHA-256").digest(s.getBytes(StandardCharsets.UTF_8));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/NoPrivilegeException.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/NoPrivilegeException.java
new file mode 100644
index 000000000..c868aebe7
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/NoPrivilegeException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/** Thrown when tries to perform an operation but the current user does not 
have the privilege. */
+public class NoPrivilegeException extends RuntimeException {
+
+    public NoPrivilegeException(
+            String user, String objectType, String identifier, 
PrivilegeType... privilege) {
+        super(
+                String.format(
+                        "User %s doesn't have privilege %s on %s %s",
+                        user,
+                        Arrays.stream(privilege)
+                                .map(Enum::name)
+                                .collect(Collectors.joining(" or ")),
+                        objectType,
+                        identifier));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeChecker.java 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeChecker.java
new file mode 100644
index 000000000..ebd579bdb
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeChecker.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.io.Serializable;
+
+/** Check if current user has privilege to perform related operations. */
+public interface PrivilegeChecker extends Serializable {
+
+    void assertCanSelect(Identifier identifier);
+
+    void assertCanInsert(Identifier identifier);
+
+    void assertCanAlterTable(Identifier identifier);
+
+    void assertCanDropTable(Identifier identifier);
+
+    void assertCanCreateTable(String databaseName);
+
+    void assertCanDropDatabase(String databaseName);
+
+    void assertCanCreateDatabase();
+
+    void assertCanCreateUser();
+
+    void assertCanDropUser();
+
+    void assertCanGrant(String identifier, PrivilegeType privilege);
+
+    void assertCanRevoke();
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeCheckerImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeCheckerImpl.java
new file mode 100644
index 000000000..19c1813ee
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeCheckerImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Default implementation of {@link PrivilegeChecker}. */
+public class PrivilegeCheckerImpl implements PrivilegeChecker {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String user;
+    private final Map<String, Set<PrivilegeType>> privileges;
+
+    public PrivilegeCheckerImpl(String user, Map<String, Set<PrivilegeType>> 
privileges) {
+        this.user = user;
+        this.privileges = privileges;
+    }
+
+    @Override
+    public void assertCanSelect(Identifier identifier) {
+        if (!check(identifier.getFullName(), PrivilegeType.SELECT)) {
+            throw new NoPrivilegeException(
+                    user, "table", identifier.getFullName(), 
PrivilegeType.SELECT);
+        }
+    }
+
+    @Override
+    public void assertCanInsert(Identifier identifier) {
+        if (!check(identifier.getFullName(), PrivilegeType.INSERT)) {
+            throw new NoPrivilegeException(
+                    user, "table", identifier.getFullName(), 
PrivilegeType.INSERT);
+        }
+    }
+
+    @Override
+    public void assertCanAlterTable(Identifier identifier) {
+        if (!check(identifier.getFullName(), PrivilegeType.ALTER_TABLE)) {
+            throw new NoPrivilegeException(
+                    user, "table", identifier.getFullName(), 
PrivilegeType.ALTER_TABLE);
+        }
+    }
+
+    @Override
+    public void assertCanDropTable(Identifier identifier) {
+        if (!check(identifier.getFullName(), PrivilegeType.DROP_TABLE)) {
+            throw new NoPrivilegeException(
+                    user, "table", identifier.getFullName(), 
PrivilegeType.DROP_TABLE);
+        }
+    }
+
+    @Override
+    public void assertCanCreateTable(String databaseName) {
+        if (!check(databaseName, PrivilegeType.CREATE_TABLE)) {
+            throw new NoPrivilegeException(
+                    user, "database", databaseName, 
PrivilegeType.CREATE_TABLE);
+        }
+    }
+
+    @Override
+    public void assertCanDropDatabase(String databaseName) {
+        if (!check(databaseName, PrivilegeType.DROP_DATABASE)) {
+            throw new NoPrivilegeException(
+                    user, "database", databaseName, 
PrivilegeType.DROP_DATABASE);
+        }
+    }
+
+    @Override
+    public void assertCanCreateDatabase() {
+        if (!check(
+                FileBasedPrivilegeManager.IDENTIFIER_WHOLE_CATALOG,
+                PrivilegeType.CREATE_DATABASE)) {
+            throw new NoPrivilegeException(
+                    user,
+                    "catalog",
+                    FileBasedPrivilegeManager.IDENTIFIER_WHOLE_CATALOG,
+                    PrivilegeType.DROP_DATABASE);
+        }
+    }
+
+    @Override
+    public void assertCanCreateUser() {
+        assertHasAdmin();
+    }
+
+    @Override
+    public void assertCanDropUser() {
+        assertHasAdmin();
+    }
+
+    @Override
+    public void assertCanGrant(String identifier, PrivilegeType privilege) {
+        assertHasAdmin();
+    }
+
+    @Override
+    public void assertCanRevoke() {
+        assertHasAdmin();
+    }
+
+    private void assertHasAdmin() {
+        if (!check(FileBasedPrivilegeManager.IDENTIFIER_WHOLE_CATALOG, 
PrivilegeType.ADMIN)) {
+            throw new NoPrivilegeException(
+                    user,
+                    "catalog",
+                    FileBasedPrivilegeManager.IDENTIFIER_WHOLE_CATALOG,
+                    PrivilegeType.ADMIN);
+        }
+    }
+
+    private boolean check(String identifier, PrivilegeType privilege) {
+        Set<PrivilegeType> set = privileges.get(identifier);
+        if (set != null && set.contains(privilege)) {
+            return true;
+        } else if (identifier.isEmpty()) {
+            return false;
+        } else {
+            return check(
+                    identifier.substring(0, 
Math.max(identifier.lastIndexOf('.'), 0)), privilege);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PrivilegeCheckerImpl that = (PrivilegeCheckerImpl) o;
+        return Objects.equals(user, that.user) && Objects.equals(privileges, 
that.privileges);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeManager.java 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeManager.java
new file mode 100644
index 000000000..52eb81ea5
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeManager.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+/**
+ * Interface for the privilege system which supports identity-based and 
role-based access control.
+ *
+ * <p>When the privilege system is initialized two special users, root and 
anonymous, will be
+ * created by default.
+ *
+ * <ul>
+ *   <li>root is the user with all privileges. The password of root is set 
when calling {@link
+ *       PrivilegeManager#initializePrivilege}.
+ *   <li>anonymous is the default username if no user and password are 
provided when creating the
+ *       catalog. The default password of anonymous is <code>anonymous</code>.
+ * </ul>
+ *
+ * <p>The privilege system also follows a hierarchical model. That is, If a 
user has a privilege on
+ * an identifier A, he also has this privilege on identifier B, where A is a 
prefix of B.
+ * Identifiers can be
+ *
+ * <ul>
+ *   <li>the whole catalog (identifier is an empty string)
+ *   <li>a database (identifier is &lt;database-name&gt;)
+ *   <li>a table (identifier is &lt;database-name&gt;.&lt;table-name&gt;)
+ * </ul>
+ */
+public interface PrivilegeManager {
+
+    String USER_ROOT = "root";
+    String USER_ANONYMOUS = "anonymous";
+    String PASSWORD_ANONYMOUS = "anonymous";
+    String IDENTIFIER_WHOLE_CATALOG = "";
+
+    /** Check if the privilege system is enabled. */
+    boolean privilegeEnabled();
+
+    /**
+     * Initialize the privilege system if not enabled. Also creates two 
special users: root and
+     * anonymous.
+     */
+    void initializePrivilege(String rootPassword);
+
+    /** Create {@code user} with {@code password}. */
+    void createUser(String user, String password);
+
+    /** Remove {@code user} from the privilege system. */
+    void dropUser(String user);
+
+    /** Grant {@code user} with {@code privilege} on {@code identifier}. */
+    void grant(String user, String identifier, PrivilegeType privilege);
+
+    /**
+     * Revoke {@code privilege} from {@code user} on {@code identifier}. Note 
that {@code user} will
+     * also lose {@code privilege} on all descendants of {@code identifier}.
+     */
+    int revoke(String user, String identifier, PrivilegeType privilege);
+
+    /**
+     * Notify the privilege system that the identifier of an object is changed 
from {@code oldName}
+     * to {@code newName}.
+     */
+    void objectRenamed(String oldName, String newName);
+
+    /** Notify the privilege system that the object with {@code identifier} is 
dropped. */
+    void objectDropped(String identifier);
+
+    /**
+     * Get {@link PrivilegeChecker} of this privilege system to check if a 
user has specific
+     * privileges on an object.
+     */
+    PrivilegeChecker getPrivilegeChecker();
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeType.java 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeType.java
new file mode 100644
index 000000000..375f5030d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegeType.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.annotation.Public;
+
+/**
+ * Types of privilege.
+ *
+ * @since 0.7.0
+ */
+@Public
+public enum PrivilegeType {
+    SELECT(PrivilegeTarget.TABLE),
+    INSERT(PrivilegeTarget.TABLE),
+    ALTER_TABLE(PrivilegeTarget.TABLE),
+    DROP_TABLE(PrivilegeTarget.TABLE),
+
+    CREATE_TABLE(PrivilegeTarget.DATABASE),
+    DROP_DATABASE(PrivilegeTarget.DATABASE),
+
+    CREATE_DATABASE(PrivilegeTarget.CATALOG),
+    // you can create and drop users, grant and revoke any privileges to or 
from others
+    ADMIN(PrivilegeTarget.CATALOG);
+
+    private final PrivilegeTarget target;
+
+    PrivilegeType(PrivilegeTarget target) {
+        this.target = target;
+    }
+
+    public boolean canGrantOnCatalog() {
+        return PrivilegeTarget.CATALOG.equals(target) || canGrantOnDatabase();
+    }
+
+    public boolean canGrantOnDatabase() {
+        return PrivilegeTarget.DATABASE.equals(target) || canGrantOnTable();
+    }
+
+    public boolean canGrantOnTable() {
+        return PrivilegeTarget.TABLE.equals(target);
+    }
+
+    private enum PrivilegeTarget {
+        CATALOG,
+        DATABASE,
+        TABLE
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
new file mode 100644
index 000000000..7c2ab9493
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLockContext;
+import org.apache.paimon.catalog.CatalogLockFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** {@link Catalog} which supports privilege system. */
+public class PrivilegedCatalog implements Catalog {
+
+    public static final ConfigOption<String> USER =
+            
ConfigOptions.key("user").stringType().defaultValue(PrivilegeManager.USER_ANONYMOUS);
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .defaultValue(PrivilegeManager.PASSWORD_ANONYMOUS);
+
+    private final Catalog wrapped;
+    private final PrivilegeManager privilegeManager;
+
+    public PrivilegedCatalog(Catalog wrapped, PrivilegeManager 
privilegeManager) {
+        this.wrapped = wrapped;
+        this.privilegeManager = privilegeManager;
+    }
+
+    @Override
+    public boolean caseSensitive() {
+        return wrapped.caseSensitive();
+    }
+
+    @Override
+    public String warehouse() {
+        return wrapped.warehouse();
+    }
+
+    @Override
+    public Map<String, String> options() {
+        return wrapped.options();
+    }
+
+    @Override
+    public FileIO fileIO() {
+        return wrapped.fileIO();
+    }
+
+    @Override
+    public Optional<CatalogLockFactory> lockFactory() {
+        return wrapped.lockFactory();
+    }
+
+    @Override
+    public Optional<CatalogLockContext> lockContext() {
+        return wrapped.lockContext();
+    }
+
+    @Override
+    public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier 
identifier) {
+        return wrapped.metastoreClientFactory(identifier);
+    }
+
+    @Override
+    public List<String> listDatabases() {
+        return wrapped.listDatabases();
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) {
+        return wrapped.databaseExists(databaseName);
+    }
+
+    @Override
+    public void createDatabase(String name, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException {
+        privilegeManager.getPrivilegeChecker().assertCanCreateDatabase();
+        wrapped.createDatabase(name, ignoreIfExists);
+    }
+
+    @Override
+    public void createDatabase(String name, boolean ignoreIfExists, 
Map<String, String> properties)
+            throws DatabaseAlreadyExistException {
+        privilegeManager.getPrivilegeChecker().assertCanCreateDatabase();
+        wrapped.createDatabase(name, ignoreIfExists, properties);
+    }
+
+    @Override
+    public Map<String, String> loadDatabaseProperties(String name)
+            throws DatabaseNotExistException {
+        return wrapped.loadDatabaseProperties(name);
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean 
cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException {
+        privilegeManager.getPrivilegeChecker().assertCanDropDatabase(name);
+        wrapped.dropDatabase(name, ignoreIfNotExists, cascade);
+        privilegeManager.objectDropped(name);
+    }
+
+    @Override
+    public List<String> listTables(String databaseName) throws 
DatabaseNotExistException {
+        return wrapped.listTables(databaseName);
+    }
+
+    @Override
+    public boolean tableExists(Identifier identifier) {
+        return wrapped.tableExists(identifier);
+    }
+
+    @Override
+    public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
+            throws TableNotExistException {
+        privilegeManager.getPrivilegeChecker().assertCanDropTable(identifier);
+        wrapped.dropTable(identifier, ignoreIfNotExists);
+        privilegeManager.objectDropped(identifier.getFullName());
+    }
+
+    @Override
+    public void createTable(Identifier identifier, Schema schema, boolean 
ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException {
+        
privilegeManager.getPrivilegeChecker().assertCanCreateTable(identifier.getDatabaseName());
+        wrapped.createTable(identifier, schema, ignoreIfExists);
+    }
+
+    @Override
+    public void renameTable(Identifier fromTable, Identifier toTable, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException {
+        privilegeManager.getPrivilegeChecker().assertCanAlterTable(fromTable);
+        wrapped.renameTable(fromTable, toTable, ignoreIfNotExists);
+        Preconditions.checkState(
+                wrapped.tableExists(toTable),
+                "Table "
+                        + toTable
+                        + " does not exist. There might be concurrent 
renaming. "
+                        + "Aborting updates in privilege system.");
+        privilegeManager.objectRenamed(fromTable.getFullName(), 
toTable.getFullName());
+    }
+
+    @Override
+    public void alterTable(Identifier identifier, SchemaChange change, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
+        privilegeManager.getPrivilegeChecker().assertCanAlterTable(identifier);
+        wrapped.alterTable(identifier, change, ignoreIfNotExists);
+    }
+
+    @Override
+    public void alterTable(
+            Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
+        privilegeManager.getPrivilegeChecker().assertCanAlterTable(identifier);
+        wrapped.alterTable(identifier, changes, ignoreIfNotExists);
+    }
+
+    @Override
+    public Table getTable(Identifier identifier) throws TableNotExistException 
{
+        Table table = wrapped.getTable(identifier);
+        if (table instanceof FileStoreTable) {
+            return new PrivilegedFileStoreTable(
+                    (FileStoreTable) table, 
privilegeManager.getPrivilegeChecker(), identifier);
+        } else {
+            return table;
+        }
+    }
+
+    @Override
+    public void dropPartition(Identifier identifier, Map<String, String> 
partitions)
+            throws TableNotExistException, PartitionNotExistException {
+        privilegeManager.getPrivilegeChecker().assertCanInsert(identifier);
+        wrapped.dropPartition(identifier, partitions);
+    }
+
+    @Override
+    public void close() throws Exception {
+        wrapped.close();
+    }
+
+    public void createPrivilegedUser(String user, String password) {
+        privilegeManager.createUser(user, password);
+    }
+
+    public void dropPrivilegedUser(String user) {
+        privilegeManager.dropUser(user);
+    }
+
+    public void grantPrivilegeOnCatalog(String user, PrivilegeType privilege) {
+        Preconditions.checkArgument(
+                privilege.canGrantOnCatalog(),
+                "Privilege " + privilege + " can't be granted on a catalog");
+        privilegeManager.grant(user, 
PrivilegeManager.IDENTIFIER_WHOLE_CATALOG, privilege);
+    }
+
+    public void grantPrivilegeOnDatabase(
+            String user, String databaseName, PrivilegeType privilege) {
+        Preconditions.checkArgument(
+                privilege.canGrantOnDatabase(),
+                "Privilege " + privilege + " can't be granted on a database");
+        Preconditions.checkArgument(
+                databaseExists(databaseName), "Database " + databaseName + " 
does not exist");
+        privilegeManager.grant(user, databaseName, privilege);
+    }
+
+    public void grantPrivilegeOnTable(String user, Identifier identifier, 
PrivilegeType privilege) {
+        Preconditions.checkArgument(
+                privilege.canGrantOnTable(),
+                "Privilege " + privilege + " can't be granted on a table");
+        Preconditions.checkArgument(
+                tableExists(identifier), "Table " + identifier + " does not 
exist");
+        privilegeManager.grant(user, identifier.getFullName(), privilege);
+    }
+
+    /** Returns the number of privilege revoked. */
+    public int revokePrivilegeOnCatalog(String user, PrivilegeType privilege) {
+        return privilegeManager.revoke(user, 
PrivilegeManager.IDENTIFIER_WHOLE_CATALOG, privilege);
+    }
+
+    /** Returns the number of privilege revoked. */
+    public int revokePrivilegeOnDatabase(
+            String user, String databaseName, PrivilegeType privilege) {
+        return privilegeManager.revoke(user, databaseName, privilege);
+    }
+
+    /** Returns the number of privilege revoked. */
+    public int revokePrivilegeOnTable(String user, Identifier identifier, 
PrivilegeType privilege) {
+        return privilegeManager.revoke(user, identifier.getFullName(), 
privilege);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
new file mode 100644
index 000000000..cf8e4c357
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.ManifestCacheFilter;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreCommit;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.FileStoreWrite;
+import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.service.ServiceManager;
+import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.TagCallback;
+import org.apache.paimon.tag.TagAutoManager;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** {@link FileStore} with privilege checks. */
+public class PrivilegedFileStore<T> implements FileStore<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStore<T> wrapped;
+    private final PrivilegeChecker privilegeChecker;
+    private final Identifier identifier;
+
+    public PrivilegedFileStore(
+            FileStore<T> wrapped, PrivilegeChecker privilegeChecker, 
Identifier identifier) {
+        this.wrapped = wrapped;
+        this.privilegeChecker = privilegeChecker;
+        this.identifier = identifier;
+    }
+
+    @Override
+    public FileStorePathFactory pathFactory() {
+        return wrapped.pathFactory();
+    }
+
+    @Override
+    public SnapshotManager snapshotManager() {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.snapshotManager();
+    }
+
+    @Override
+    public RowType partitionType() {
+        return wrapped.partitionType();
+    }
+
+    @Override
+    public CoreOptions options() {
+        return wrapped.options();
+    }
+
+    @Override
+    public BucketMode bucketMode() {
+        return wrapped.bucketMode();
+    }
+
+    @Override
+    public FileStoreScan newScan() {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.newScan();
+    }
+
+    @Override
+    public FileStoreScan newScan(String branchName) {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.newScan(branchName);
+    }
+
+    @Override
+    public ManifestList.Factory manifestListFactory() {
+        return wrapped.manifestListFactory();
+    }
+
+    @Override
+    public ManifestFile.Factory manifestFileFactory() {
+        return wrapped.manifestFileFactory();
+    }
+
+    @Override
+    public IndexFileHandler newIndexFileHandler() {
+        return wrapped.newIndexFileHandler();
+    }
+
+    @Override
+    public StatsFileHandler newStatsFileHandler() {
+        return wrapped.newStatsFileHandler();
+    }
+
+    @Override
+    public SplitRead<T> newRead() {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.newRead();
+    }
+
+    @Override
+    public FileStoreWrite<T> newWrite(String commitUser) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newWrite(commitUser);
+    }
+
+    @Override
+    public FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter 
manifestFilter) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newWrite(commitUser, manifestFilter);
+    }
+
+    @Override
+    public FileStoreCommit newCommit(String commitUser) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newCommit(commitUser);
+    }
+
+    @Override
+    public FileStoreCommit newCommit(String commitUser, String branchName) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newCommit(commitUser, branchName);
+    }
+
+    @Override
+    public SnapshotDeletion newSnapshotDeletion() {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newSnapshotDeletion();
+    }
+
+    @Override
+    public TagManager newTagManager() {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newTagManager();
+    }
+
+    @Override
+    public TagDeletion newTagDeletion() {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newTagDeletion();
+    }
+
+    @Nullable
+    @Override
+    public PartitionExpire newPartitionExpire(String commitUser) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newPartitionExpire(commitUser);
+    }
+
+    @Override
+    public TagAutoManager newTagCreationManager() {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newTagCreationManager();
+    }
+
+    @Override
+    public ServiceManager newServiceManager() {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.newServiceManager();
+    }
+
+    @Override
+    public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.mergeSchema(rowType, allowExplicitCast);
+    }
+
+    @Override
+    public List<TagCallback> createTagCallbacks() {
+        return wrapped.createTagCallbacks();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
new file mode 100644
index 000000000..5820e46b4
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestCacheFilter;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.ExpireSnapshots;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.query.LocalTableQuery;
+import org.apache.paimon.table.sink.RowKeyExtractor;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** {@link FileStoreTable} with privilege checks. */
+public class PrivilegedFileStoreTable implements FileStoreTable {
+
+    private final FileStoreTable wrapped;
+    private final PrivilegeChecker privilegeChecker;
+    private final Identifier identifier;
+
+    public PrivilegedFileStoreTable(
+            FileStoreTable wrapped, PrivilegeChecker privilegeChecker, 
Identifier identifier) {
+        this.wrapped = wrapped;
+        this.privilegeChecker = privilegeChecker;
+        this.identifier = identifier;
+    }
+
+    @Override
+    public SnapshotReader newSnapshotReader() {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.newSnapshotReader();
+    }
+
+    @Override
+    public SnapshotReader newSnapshotReader(String branchName) {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.newSnapshotReader(branchName);
+    }
+
+    @Override
+    public CoreOptions coreOptions() {
+        return wrapped.coreOptions();
+    }
+
+    @Override
+    public SnapshotManager snapshotManager() {
+        return wrapped.snapshotManager();
+    }
+
+    @Override
+    public TagManager tagManager() {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.tagManager();
+    }
+
+    @Override
+    public BranchManager branchManager() {
+        privilegeChecker.assertCanSelect(identifier);
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.branchManager();
+    }
+
+    @Override
+    public Path location() {
+        return wrapped.location();
+    }
+
+    @Override
+    public FileIO fileIO() {
+        return wrapped.fileIO();
+    }
+
+    @Override
+    public TableSchema schema() {
+        return wrapped.schema();
+    }
+
+    @Override
+    public FileStore<?> store() {
+        return new PrivilegedFileStore<>(wrapped.store(), privilegeChecker, 
identifier);
+    }
+
+    @Override
+    public BucketMode bucketMode() {
+        return wrapped.bucketMode();
+    }
+
+    @Override
+    public CatalogEnvironment catalogEnvironment() {
+        return wrapped.catalogEnvironment();
+    }
+
+    @Override
+    public Optional<Statistics> statistics() {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.statistics();
+    }
+
+    @Override
+    public FileStoreTable copy(Map<String, String> dynamicOptions) {
+        return new PrivilegedFileStoreTable(
+                wrapped.copy(dynamicOptions), privilegeChecker, identifier);
+    }
+
+    @Override
+    public FileStoreTable copy(TableSchema newTableSchema) {
+        return new PrivilegedFileStoreTable(
+                wrapped.copy(newTableSchema), privilegeChecker, identifier);
+    }
+
+    @Override
+    public void rollbackTo(long snapshotId) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.rollbackTo(snapshotId);
+    }
+
+    @Override
+    public void createTag(String tagName) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.createTag(tagName);
+    }
+
+    @Override
+    public void createTag(String tagName, long fromSnapshotId) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.createTag(tagName, fromSnapshotId);
+    }
+
+    @Override
+    public void createTag(String tagName, Duration timeRetained) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.createTag(tagName, timeRetained);
+    }
+
+    @Override
+    public void createTag(String tagName, long fromSnapshotId, Duration 
timeRetained) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.createTag(tagName, fromSnapshotId, timeRetained);
+    }
+
+    @Override
+    public void deleteTag(String tagName) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.deleteTag(tagName);
+    }
+
+    @Override
+    public void rollbackTo(String tagName) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.rollbackTo(tagName);
+    }
+
+    @Override
+    public void createBranch(String branchName) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.createBranch(branchName);
+    }
+
+    @Override
+    public void createBranch(String branchName, long snapshotId) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.createBranch(branchName, snapshotId);
+    }
+
+    @Override
+    public void createBranch(String branchName, String tagName) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.createBranch(branchName, tagName);
+    }
+
+    @Override
+    public void deleteBranch(String branchName) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.deleteBranch(branchName);
+    }
+
+    @Override
+    public ExpireSnapshots newExpireSnapshots() {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newExpireSnapshots();
+    }
+
+    @Override
+    public ExpireSnapshots newExpireChangelog() {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newExpireChangelog();
+    }
+
+    @Override
+    public FileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
+        return new PrivilegedFileStoreTable(
+                wrapped.copyWithoutTimeTravel(dynamicOptions), 
privilegeChecker, identifier);
+    }
+
+    @Override
+    public FileStoreTable copyWithLatestSchema() {
+        return new PrivilegedFileStoreTable(
+                wrapped.copyWithLatestSchema(), privilegeChecker, identifier);
+    }
+
+    @Override
+    public InnerTableScan newScan() {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.newScan();
+    }
+
+    @Override
+    public InnerStreamTableScan newStreamScan() {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.newStreamScan();
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.newRead();
+    }
+
+    @Override
+    public TableWriteImpl<?> newWrite(String commitUser) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newWrite(commitUser);
+    }
+
+    @Override
+    public TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter 
manifestFilter) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newWrite(commitUser, manifestFilter);
+    }
+
+    @Override
+    public TableCommitImpl newCommit(String commitUser) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newCommit(commitUser);
+    }
+
+    @Override
+    public TableCommitImpl newCommit(String commitUser, String branchName) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newCommit(commitUser, branchName);
+    }
+
+    @Override
+    public LocalTableQuery newLocalTableQuery() {
+        privilegeChecker.assertCanSelect(identifier);
+        return wrapped.newLocalTableQuery();
+    }
+
+    @Override
+    public boolean supportStreamingReadOverwrite() {
+        return wrapped.supportStreamingReadOverwrite();
+    }
+
+    @Override
+    public RowKeyExtractor createRowKeyExtractor() {
+        return wrapped.createRowKeyExtractor();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PrivilegedFileStoreTable that = (PrivilegedFileStoreTable) o;
+        return Objects.equals(wrapped, that.wrapped)
+                && Objects.equals(privilegeChecker, that.privilegeChecker)
+                && Objects.equals(identifier, that.identifier);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/privilege/FileBasedPrivilegeManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/privilege/FileBasedPrivilegeManagerTest.java
new file mode 100644
index 000000000..548a3925f
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/privilege/FileBasedPrivilegeManagerTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link FileBasedPrivilegeManager}. */
+public class FileBasedPrivilegeManagerTest {
+
+    private static final String PASSWORD_ROOT = "123456";
+
+    @TempDir public java.nio.file.Path tempPath;
+    private Path warehouse;
+
+    @BeforeEach
+    public void beforeEach() {
+        warehouse = new Path(TraceableFileIO.SCHEME + "://" + 
tempPath.toString());
+    }
+
+    @AfterEach
+    public void afterEach() {
+        Predicate<Path> pathPredicate = path -> 
path.toString().contains(tempPath.toString());
+        assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
+        assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+    }
+
+    @Test
+    public void testInitializeMultipleTimes() throws Exception {
+        initPrivilege();
+        assertThrows(IllegalStateException.class, this::initPrivilege);
+    }
+
+    @Test
+    public void testUsers() throws Exception {
+        initPrivilege();
+
+        FileBasedPrivilegeManager rootManager = getPrivilegeManager("root", 
PASSWORD_ROOT);
+        rootManager.createUser("test", "passwd");
+        assertThrows(
+                IllegalArgumentException.class, () -> 
rootManager.createUser("test", "changed"));
+
+        FileBasedPrivilegeManager testManager = getPrivilegeManager("test", 
"passwd");
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> getPrivilegeManager("test", 
"wrong").getPrivilegeChecker());
+        assertThrows(NoPrivilegeException.class, () -> 
testManager.createUser("test2", "passwd"));
+
+        rootManager.dropUser("test");
+        rootManager.dropUser("test");
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> getPrivilegeManager("test", 
"passwd").getPrivilegeChecker());
+        assertThrows(IllegalArgumentException.class, () -> 
rootManager.dropUser("root"));
+        assertThrows(IllegalArgumentException.class, () -> 
rootManager.dropUser("anonymous"));
+    }
+
+    @Test
+    public void testGrantAndRevoke() throws Exception {
+        initPrivilege();
+
+        FileBasedPrivilegeManager rootManager = getPrivilegeManager("root", 
PASSWORD_ROOT);
+        rootManager.createUser("test", "passwd");
+        rootManager.grant("test", "my_db", PrivilegeType.SELECT);
+        rootManager.grant("test", "another_db.my_tbl", PrivilegeType.SELECT);
+        rootManager.grant("test", "another_db.my_tbl", PrivilegeType.INSERT);
+        rootManager.grant("test", "another_db.another_tbl", 
PrivilegeType.INSERT);
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> rootManager.grant("test2", "my_db", 
PrivilegeType.SELECT));
+        FileBasedPrivilegeManager testManager = getPrivilegeManager("test", 
"passwd");
+
+        {
+            PrivilegeChecker checker = testManager.getPrivilegeChecker();
+
+            checker.assertCanSelect(Identifier.create("my_db", "my_tbl"));
+            checker.assertCanSelect(Identifier.create("my_db", "another_tbl"));
+            checker.assertCanSelect(Identifier.create("another_db", "my_tbl"));
+            assertThrows(
+                    NoPrivilegeException.class,
+                    () -> 
checker.assertCanSelect(Identifier.create("another_db", "another_tbl")));
+
+            assertThrows(
+                    NoPrivilegeException.class,
+                    () -> checker.assertCanInsert(Identifier.create("my_db", 
"my_tbl")));
+            assertThrows(
+                    NoPrivilegeException.class,
+                    () -> checker.assertCanInsert(Identifier.create("my_db", 
"another_tbl")));
+            checker.assertCanInsert(Identifier.create("another_db", "my_tbl"));
+            checker.assertCanInsert(Identifier.create("another_db", 
"another_tbl"));
+
+            assertThrows(
+                    NoPrivilegeException.class,
+                    () ->
+                            testManager.grant(
+                                    "test", "another_db.another_tbl", 
PrivilegeType.SELECT));
+        }
+
+        rootManager.revoke("test", "another_db", PrivilegeType.INSERT);
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> rootManager.revoke("test2", "another_db", 
PrivilegeType.INSERT));
+
+        {
+            PrivilegeChecker checker = testManager.getPrivilegeChecker();
+
+            checker.assertCanSelect(Identifier.create("my_db", "my_tbl"));
+            checker.assertCanSelect(Identifier.create("my_db", "another_tbl"));
+            checker.assertCanSelect(Identifier.create("another_db", "my_tbl"));
+            assertThrows(
+                    NoPrivilegeException.class,
+                    () -> 
checker.assertCanSelect(Identifier.create("another_db", "another_tbl")));
+
+            assertThrows(
+                    NoPrivilegeException.class,
+                    () -> checker.assertCanInsert(Identifier.create("my_db", 
"my_tbl")));
+            assertThrows(
+                    NoPrivilegeException.class,
+                    () -> checker.assertCanInsert(Identifier.create("my_db", 
"another_tbl")));
+            assertThrows(
+                    NoPrivilegeException.class,
+                    () -> 
checker.assertCanInsert(Identifier.create("another_db", "my_tbl")));
+            assertThrows(
+                    NoPrivilegeException.class,
+                    () -> 
checker.assertCanInsert(Identifier.create("another_db", "another_tbl")));
+        }
+    }
+
+    private void initPrivilege() throws Exception {
+        getPrivilegeManager("anonymous", 
"anonymous").initializePrivilege(PASSWORD_ROOT);
+    }
+
+    private FileBasedPrivilegeManager getPrivilegeManager(String user, String 
password)
+            throws Exception {
+        return new FileBasedPrivilegeManager(
+                warehouse.toString(),
+                FileIO.get(warehouse, CatalogContext.create(warehouse)),
+                user,
+                password);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java
index f4bdca414..a8d645c4b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.paimon.stats;
 
-import org.apache.paimon.AbstractFileStore;
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.io.DataFileMeta;
@@ -67,7 +67,7 @@ public class StatsTableTest extends TableTestBase {
                 GenericRow.of(2, 1, 1));
 
         FileStoreTable storeTable = (FileStoreTable) table;
-        AbstractFileStore<?> store = (AbstractFileStore<?>) storeTable.store();
+        FileStore<?> store = storeTable.store();
         String manifestListFile = 
storeTable.snapshotManager().latestSnapshot().deltaManifestList();
 
         ManifestList manifestList = store.manifestListFactory().create();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 138a30d5b..bccf0fe0e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -18,8 +18,8 @@
 
 package org.apache.paimon.table;
 
-import org.apache.paimon.AbstractFileStore;
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.TestFileStore;
 import org.apache.paimon.data.BinaryRow;
@@ -1182,7 +1182,7 @@ public abstract class FileStoreTableTestBase {
         SnapshotManager snapshotManager = table.snapshotManager();
         long latestSnapshotId = snapshotManager.latestSnapshotId();
 
-        AbstractFileStore<?> store = (AbstractFileStore<?>) table.store();
+        FileStore<?> store = table.store();
         Set<Path> filesInUse =
                 TestFileStore.getFilesInUse(
                         latestSnapshotId,
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index adf4adf2e..098fa17e9 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.procedure;
 
-import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.CompactAction;
 import org.apache.paimon.flink.action.SortCompactAction;
@@ -78,8 +77,8 @@ public class CompactProcedure extends ProcedureBase {
             String orderByColumns,
             String tableOptions)
             throws Exception {
-        String warehouse = ((AbstractCatalog) catalog).warehouse();
-        Map<String, String> catalogOptions = ((AbstractCatalog) 
catalog).options();
+        String warehouse = catalog.warehouse();
+        Map<String, String> catalogOptions = catalog.options();
         Map<String, String> tableConf =
                 StringUtils.isBlank(tableOptions)
                         ? Collections.emptyMap()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
index 168947813..4fea9b635 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.procedure;
 
-import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.flink.action.CompactDatabaseAction;
 import org.apache.paimon.utils.StringUtils;
 
@@ -99,8 +98,8 @@ public class CompactDatabaseProcedure extends ProcedureBase {
             String excludingTables,
             String tableOptions)
             throws Exception {
-        String warehouse = ((AbstractCatalog) catalog).warehouse();
-        Map<String, String> catalogOptions = ((AbstractCatalog) 
catalog).options();
+        String warehouse = catalog.warehouse();
+        Map<String, String> catalogOptions = catalog.options();
         CompactDatabaseAction action =
                 new CompactDatabaseAction(warehouse, catalogOptions)
                         .includingDatabases(nullable(includingDatabases))
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 8a78ad3b8..01421c8bf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.procedure;
 
-import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.CompactAction;
 import org.apache.paimon.flink.action.SortCompactAction;
@@ -62,8 +61,8 @@ public class CompactProcedure extends ProcedureBase {
             String orderByColumns,
             String tableOptions)
             throws Exception {
-        String warehouse = ((AbstractCatalog) catalog).warehouse();
-        Map<String, String> catalogOptions = ((AbstractCatalog) 
catalog).options();
+        String warehouse = catalog.warehouse();
+        Map<String, String> catalogOptions = catalog.options();
         Map<String, String> tableConf =
                 isBlank(tableOptions)
                         ? Collections.emptyMap()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
index c4236ac7d..acda2afd2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.procedure;
 
-import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.MergeIntoAction;
 
@@ -180,8 +179,8 @@ public class MergeIntoProcedure extends ProcedureBase {
             String notMatchedInsertCondition,
             String notMatchedInsertValues,
             String matchedDeleteCondition) {
-        String warehouse = ((AbstractCatalog) catalog).warehouse();
-        Map<String, String> catalogOptions = ((AbstractCatalog) 
catalog).options();
+        String warehouse = catalog.warehouse();
+        Map<String, String> catalogOptions = catalog.options();
         Identifier identifier = Identifier.fromString(targetTableId);
         MergeIntoAction action =
                 new MergeIntoAction(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/CreatePrivilegedUserProcedure.java
similarity index 53%
copy from 
paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/CreatePrivilegedUserProcedure.java
index 8f4b6eede..76b1f6af8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/CreatePrivilegedUserProcedure.java
@@ -16,30 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.catalog;
+package org.apache.paimon.flink.procedure.privilege;
 
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.table.TableType;
+import org.apache.flink.table.procedure.ProcedureContext;
 
-import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
+/**
+ * Procedure to create a user for the privilege system. Only users with {@link
+ * org.apache.paimon.privilege.PrivilegeType#ADMIN} privilege can perform this 
operation. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.create_privileged_user('username', 'password')
+ * </code></pre>
+ */
+public class CreatePrivilegedUserProcedure extends PrivilegeProcedureBase {
 
-/** Factory to create {@link FileSystemCatalog}. */
-public class FileSystemCatalogFactory implements CatalogFactory {
+    public static final String IDENTIFIER = "create_privileged_user";
 
-    public static final String IDENTIFIER = "filesystem";
+    public String[] call(ProcedureContext procedureContext, String name, 
String password) {
+        getPrivilegedCatalog().createPrivilegedUser(name, password);
+        return new String[] {String.format("User %s is created without any 
privileges.", name)};
+    }
 
     @Override
     public String identifier() {
         return IDENTIFIER;
     }
-
-    @Override
-    public Catalog create(FileIO fileIO, Path warehouse, CatalogContext 
context) {
-        if (!TableType.MANAGED.equals(context.options().get(TABLE_TYPE))) {
-            throw new IllegalArgumentException(
-                    "Only managed table is supported in File system catalog.");
-        }
-        return new FileSystemCatalog(fileIO, warehouse, context.options());
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/DropPrivilegedUserProcedure.java
similarity index 53%
copy from 
paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/DropPrivilegedUserProcedure.java
index 8f4b6eede..3902a7f19 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/DropPrivilegedUserProcedure.java
@@ -16,30 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.catalog;
+package org.apache.paimon.flink.procedure.privilege;
 
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.table.TableType;
+import org.apache.flink.table.procedure.ProcedureContext;
 
-import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
+/**
+ * Procedure to drop a user from the privilege system. Only users with {@link
+ * org.apache.paimon.privilege.PrivilegeType#ADMIN} privilege can perform this 
operation. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.drop_privileged_user('username')
+ * </code></pre>
+ */
+public class DropPrivilegedUserProcedure extends PrivilegeProcedureBase {
 
-/** Factory to create {@link FileSystemCatalog}. */
-public class FileSystemCatalogFactory implements CatalogFactory {
+    public static final String IDENTIFIER = "drop_privileged_user";
 
-    public static final String IDENTIFIER = "filesystem";
+    public String[] call(ProcedureContext procedureContext, String name) {
+        getPrivilegedCatalog().dropPrivilegedUser(name);
+        return new String[] {String.format("User %s is dropped.", name)};
+    }
 
     @Override
     public String identifier() {
         return IDENTIFIER;
     }
-
-    @Override
-    public Catalog create(FileIO fileIO, Path warehouse, CatalogContext 
context) {
-        if (!TableType.MANAGED.equals(context.options().get(TABLE_TYPE))) {
-            throw new IllegalArgumentException(
-                    "Only managed table is supported in File system catalog.");
-        }
-        return new FileSystemCatalog(fileIO, warehouse, context.options());
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/GrantPrivilegeToUserProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/GrantPrivilegeToUserProcedure.java
new file mode 100644
index 000000000..a647f36bc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/GrantPrivilegeToUserProcedure.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.privilege.PrivilegeType;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Procedure to grant privilege to a user. Privilege can be granted on the 
whole catalog, a database
+ * or a table. Only users with {@link 
org.apache.paimon.privilege.PrivilegeType#ADMIN} privilege can
+ * perform this operation. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.grant_privilege_to_user('username', 'privilege')
+ *  CALL sys.grant_privilege_to_user('username', 'privilege', 'database')
+ *  CALL sys.grant_privilege_to_user('username', 'privilege', 'database', 
'table')
+ * </code></pre>
+ */
+public class GrantPrivilegeToUserProcedure extends PrivilegeProcedureBase {
+
+    public static final String IDENTIFIER = "grant_privilege_to_user";
+
+    public String[] call(ProcedureContext procedureContext, String user, 
String privilege) {
+        getPrivilegedCatalog().grantPrivilegeOnCatalog(user, 
PrivilegeType.valueOf(privilege));
+        return new String[] {
+            String.format("User %s is granted with privilege %s on the 
catalog.", user, privilege)
+        };
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext, String user, String privilege, 
String database) {
+        getPrivilegedCatalog()
+                .grantPrivilegeOnDatabase(user, database, 
PrivilegeType.valueOf(privilege));
+        return new String[] {
+            String.format(
+                    "User %s is granted with privilege %s on database %s.",
+                    user, privilege, database)
+        };
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String user,
+            String privilege,
+            String database,
+            String table) {
+        Identifier identifier = Identifier.create(database, table);
+        getPrivilegedCatalog()
+                .grantPrivilegeOnTable(user, identifier, 
PrivilegeType.valueOf(privilege));
+        return new String[] {
+            String.format(
+                    "User %s is granted with privilege %s on table %s.",
+                    user, privilege, identifier)
+        };
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
new file mode 100644
index 000000000..7d71678a8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure.privilege;
+
+import org.apache.paimon.flink.procedure.ProcedureBase;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.privilege.FileBasedPrivilegeManager;
+import org.apache.paimon.privilege.PrivilegeManager;
+import org.apache.paimon.privilege.PrivilegedCatalog;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Procedure to initialize file-based privilege system in warehouse. This 
procedure will
+ * automatically create a root user with the provided password. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.init_file_based_privilege('rootPassword')
+ * </code></pre>
+ */
+public class InitFileBasedPrivilegeProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "init_file_based_privilege";
+
+    private static final List<String> SUPPORTED_CATALOGS =
+            Arrays.asList(
+                    "org.apache.paimon.catalog.FileSystemCatalog",
+                    "org.apache.paimon.hive.HiveCatalog");
+
+    public String[] call(ProcedureContext procedureContext, String 
rootPassword) {
+        if (catalog instanceof PrivilegedCatalog) {
+            throw new IllegalArgumentException("Catalog is already a 
PrivilegedCatalog");
+        } else if (!SUPPORTED_CATALOGS.contains(catalog.getClass().getName())) 
{
+            throw new IllegalArgumentException(
+                    "File based privilege system does not support " + 
catalog.getClass().getName());
+        }
+
+        Options options = new Options(catalog.options());
+        PrivilegeManager privilegeManager =
+                new FileBasedPrivilegeManager(
+                        catalog.warehouse(),
+                        catalog.fileIO(),
+                        options.get(PrivilegedCatalog.USER),
+                        options.get(PrivilegedCatalog.PASSWORD));
+        privilegeManager.initializePrivilege(rootPassword);
+        return new String[] {
+            "Privilege system is successfully enabled. Please drop and 
re-create the catalog."
+        };
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureBase.java
new file mode 100644
index 000000000..1bc9a3ba8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure.privilege;
+
+import org.apache.paimon.flink.procedure.ProcedureBase;
+import org.apache.paimon.privilege.PrivilegedCatalog;
+
+/** Base class for privilege-related procedures. */
+public abstract class PrivilegeProcedureBase extends ProcedureBase {
+
+    protected PrivilegedCatalog getPrivilegedCatalog() {
+        if (catalog instanceof PrivilegedCatalog) {
+            return (PrivilegedCatalog) catalog;
+        } else {
+            throw new UnsupportedOperationException(
+                    "The catalog you specified does not support privilege 
system");
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/RevokePrivilegeFromUserProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/RevokePrivilegeFromUserProcedure.java
new file mode 100644
index 000000000..d4834ac60
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/RevokePrivilegeFromUserProcedure.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.privilege.PrivilegeType;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Procedure to revoke privilege from a user. Privilege can be revoked from 
the whole catalog, a
+ * database or a table. Only users with {@link 
org.apache.paimon.privilege.PrivilegeType#ADMIN}
+ * privilege can perform this operation. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.revoke_privilege_from_user('username', 'privilege')
+ *  CALL sys.revoke_privilege_from_user('username', 'privilege', 'database')
+ *  CALL sys.revoke_privilege_from_user('username', 'privilege', 'database', 
'table')
+ * </code></pre>
+ */
+public class RevokePrivilegeFromUserProcedure extends PrivilegeProcedureBase {
+
+    public static final String IDENTIFIER = "revoke_privilege_from_user";
+
+    public String[] call(ProcedureContext procedureContext, String user, 
String privilege) {
+        int count =
+                getPrivilegedCatalog()
+                        .revokePrivilegeOnCatalog(user, 
PrivilegeType.valueOf(privilege));
+        return new String[] {
+            String.format("User %s is revoked with privilege %s on the 
catalog.", user, privilege),
+            "Number of privileges revoked: " + count
+        };
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext, String user, String privilege, 
String database) {
+        int count =
+                getPrivilegedCatalog()
+                        .revokePrivilegeOnDatabase(
+                                user, database, 
PrivilegeType.valueOf(privilege));
+        return new String[] {
+            String.format(
+                    "User %s is revoked with privilege %s on database %s.",
+                    user, privilege, database),
+            "Number of privileges revoked: " + count
+        };
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String user,
+            String privilege,
+            String database,
+            String table) {
+        Identifier identifier = Identifier.create(database, table);
+        int count =
+                getPrivilegedCatalog()
+                        .revokePrivilegeOnTable(user, identifier, 
PrivilegeType.valueOf(privilege));
+        return new String[] {
+            String.format(
+                    "User %s is revoked with privilege %s on table %s.",
+                    user, privilege, identifier),
+            "Number of privileges revoked: " + count
+        };
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index fe463161b..943da3e16 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -44,4 +44,9 @@ org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
 org.apache.paimon.flink.procedure.MigrateFileProcedure
 org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
 org.apache.paimon.flink.procedure.QueryServiceProcedure
-org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
+org.apache.paimon.flink.procedure.privilege.InitFileBasedPrivilegeProcedure
+org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure
+org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
+org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
+org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
new file mode 100644
index 000000000..d432c02ea
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure.privilege;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.privilege.NoPrivilegeException;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for privilege related procedures. */
+public class PrivilegeProcedureITCase extends AbstractTestBase {
+
+    private String path;
+
+    @BeforeEach
+    public void beforeEach() {
+        path = getTempDirPath();
+    }
+
+    @Test
+    public void testUserPrivileges() throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG mycat WITH (\n"
+                                + "  'type' = 'paimon',\n"
+                                + "  'warehouse' = '%s'\n"
+                                + ")",
+                        path));
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql("CREATE DATABASE mydb");
+        tEnv.executeSql("CREATE DATABASE mydb2");
+        tEnv.executeSql(
+                "CREATE TABLE mydb.T1 (\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ")");
+        tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 10), (2, 20), (3, 
30)").await();
+        tEnv.executeSql("CALL sys.init_file_based_privilege('root-passwd')");
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG anonymouscat WITH (\n"
+                                + "  'type' = 'paimon',\n"
+                                + "  'warehouse' = '%s'\n"
+                                + ")",
+                        path));
+        tEnv.executeSql("USE CATALOG anonymouscat");
+        assertNoPrivilege(
+                () -> tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 11), (2, 
21)").await());
+        assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY 
k"));
+        assertNoPrivilege(() -> tEnv.executeSql("CREATE TABLE mydb.S1 ( a INT, 
b INT )"));
+        assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE mydb.T1"));
+        assertNoPrivilege(() -> tEnv.executeSql("ALTER TABLE mydb.T1 RENAME TO 
mydb.T2"));
+        assertNoPrivilege(() -> tEnv.executeSql("CREATE DATABASE anotherdb"));
+        assertNoPrivilege(() -> tEnv.executeSql("DROP DATABASE mydb CASCADE"));
+        assertNoPrivilege(
+                () -> tEnv.executeSql("CALL 
sys.create_privileged_user('test2', 'test2-passwd')"));
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG rootcat WITH (\n"
+                                + "  'type' = 'paimon',\n"
+                                + "  'warehouse' = '%s',\n"
+                                + "  'user' = 'root',\n"
+                                + "  'password' = 'root-passwd'\n"
+                                + ")",
+                        path));
+        tEnv.executeSql("USE CATALOG rootcat");
+        tEnv.executeSql(
+                "CREATE TABLE mydb2.T2 (\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ")");
+        tEnv.executeSql("INSERT INTO mydb2.T2 VALUES (100, 1000), (200, 2000), 
(300, 3000)")
+                .await();
+        tEnv.executeSql("CALL sys.create_privileged_user('test', 
'test-passwd')");
+        tEnv.executeSql("CALL sys.grant_privilege_to_user('test', 
'CREATE_TABLE', 'mydb')");
+        tEnv.executeSql("CALL sys.grant_privilege_to_user('test', 'SELECT', 
'mydb')");
+        tEnv.executeSql("CALL sys.grant_privilege_to_user('test', 'INSERT', 
'mydb')");
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG testcat WITH (\n"
+                                + "  'type' = 'paimon',\n"
+                                + "  'warehouse' = '%s',\n"
+                                + "  'user' = 'test',\n"
+                                + "  'password' = 'test-passwd'\n"
+                                + ")",
+                        path));
+        tEnv.executeSql("USE CATALOG testcat");
+        tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 12), (2, 22)").await();
+        assertThat(collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY k"))
+                .isEqualTo(Arrays.asList(Row.of(1, 12), Row.of(2, 22), 
Row.of(3, 30)));
+        tEnv.executeSql("CREATE TABLE mydb.S1 ( a INT, b INT )");
+        tEnv.executeSql("INSERT INTO mydb.S1 VALUES (1, 100), (2, 200), (3, 
300)").await();
+        assertThat(collect(tEnv, "SELECT * FROM mydb.S1 ORDER BY a"))
+                .isEqualTo(Arrays.asList(Row.of(1, 100), Row.of(2, 200), 
Row.of(3, 300)));
+        assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE mydb.T1"));
+        assertNoPrivilege(() -> tEnv.executeSql("ALTER TABLE mydb.T1 RENAME TO 
mydb.T2"));
+        assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE mydb.S1"));
+        assertNoPrivilege(() -> tEnv.executeSql("ALTER TABLE mydb.S1 RENAME TO 
mydb.S2"));
+        assertNoPrivilege(() -> tEnv.executeSql("CREATE DATABASE anotherdb"));
+        assertNoPrivilege(() -> tEnv.executeSql("DROP DATABASE mydb CASCADE"));
+        assertNoPrivilege(
+                () -> tEnv.executeSql("CALL 
sys.create_privileged_user('test2', 'test2-passwd')"));
+
+        tEnv.executeSql("USE CATALOG rootcat");
+        tEnv.executeSql("CALL sys.create_privileged_user('test2', 
'test2-passwd')");
+        tEnv.executeSql("CALL sys.grant_privilege_to_user('test2', 'SELECT', 
'mydb2')");
+        tEnv.executeSql("CALL sys.grant_privilege_to_user('test2', 'INSERT', 
'mydb', 'T1')");
+        tEnv.executeSql("CALL sys.grant_privilege_to_user('test2', 'SELECT', 
'mydb', 'S1')");
+        tEnv.executeSql("CALL sys.grant_privilege_to_user('test2', 
'CREATE_DATABASE')");
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG test2cat WITH (\n"
+                                + "  'type' = 'paimon',\n"
+                                + "  'warehouse' = '%s',\n"
+                                + "  'user' = 'test2',\n"
+                                + "  'password' = 'test2-passwd'\n"
+                                + ")",
+                        path));
+        tEnv.executeSql("USE CATALOG test2cat");
+        tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 13), (2, 23)").await();
+        assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY 
k"));
+        assertNoPrivilege(() -> tEnv.executeSql("CREATE TABLE mydb.S2 ( a INT, 
b INT )"));
+        assertNoPrivilege(
+                () ->
+                        tEnv.executeSql("INSERT INTO mydb.S1 VALUES (1, 100), 
(2, 200), (3, 300)")
+                                .await());
+        assertThat(collect(tEnv, "SELECT * FROM mydb.S1 ORDER BY a"))
+                .isEqualTo(Arrays.asList(Row.of(1, 100), Row.of(2, 200), 
Row.of(3, 300)));
+        assertNoPrivilege(
+                () ->
+                        tEnv.executeSql(
+                                        "INSERT INTO mydb2.T2 VALUES (100, 
1001), (200, 2001), (300, 3001)")
+                                .await());
+        assertThat(collect(tEnv, "SELECT * FROM mydb2.T2 ORDER BY k"))
+                .isEqualTo(Arrays.asList(Row.of(100, 1000), Row.of(200, 2000), 
Row.of(300, 3000)));
+        tEnv.executeSql("CREATE DATABASE anotherdb");
+        assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE mydb.T1"));
+        assertNoPrivilege(() -> tEnv.executeSql("ALTER TABLE mydb.T1 RENAME TO 
mydb.T2"));
+        assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE mydb.S1"));
+        assertNoPrivilege(() -> tEnv.executeSql("ALTER TABLE mydb.S1 RENAME TO 
mydb.S2"));
+        assertNoPrivilege(() -> tEnv.executeSql("DROP DATABASE mydb CASCADE"));
+        assertNoPrivilege(
+                () -> tEnv.executeSql("CALL 
sys.create_privileged_user('test3', 'test3-passwd')"));
+
+        tEnv.executeSql("USE CATALOG rootcat");
+        assertThat(collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY k"))
+                .isEqualTo(Arrays.asList(Row.of(1, 13), Row.of(2, 23), 
Row.of(3, 30)));
+        tEnv.executeSql("CALL sys.revoke_privilege_from_user('test2', 
'SELECT')");
+        tEnv.executeSql("CALL sys.drop_privileged_user('test')");
+
+        tEnv.executeSql("USE CATALOG testcat");
+        Exception e =
+                assertThrows(
+                        Exception.class, () -> collect(tEnv, "SELECT * FROM 
mydb.T1 ORDER BY k"));
+        assertThat(e).hasRootCauseMessage("User test not found, or password 
incorrect.");
+
+        tEnv.executeSql("USE CATALOG test2cat");
+        assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb.S1 ORDER BY 
a"));
+        assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb2.T2 ORDER BY 
k"));
+        tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 14), (2, 24)").await();
+
+        tEnv.executeSql("USE CATALOG rootcat");
+        assertThat(collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY k"))
+                .isEqualTo(Arrays.asList(Row.of(1, 14), Row.of(2, 24), 
Row.of(3, 30)));
+        tEnv.executeSql("DROP DATABASE mydb CASCADE");
+        tEnv.executeSql("DROP DATABASE mydb2 CASCADE");
+    }
+
+    @Test
+    public void testDropUser() throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        initializeSingleUserTest(tEnv);
+
+        tEnv.executeSql("USE CATALOG rootcat");
+        tEnv.executeSql("CALL sys.drop_privileged_user('test')");
+        tEnv.executeSql("CALL sys.create_privileged_user('test', 
'test-passwd')");
+
+        tEnv.executeSql("USE CATALOG testcat");
+        assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY 
k"));
+        assertNoPrivilege(
+                () -> tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 12), (2, 
22)").await());
+    }
+
+    @Test
+    public void testDropObject() throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        initializeSingleUserTest(tEnv);
+
+        tEnv.executeSql("USE CATALOG rootcat");
+        tEnv.executeSql("DROP TABLE mydb.T1");
+        tEnv.executeSql(
+                "CREATE TABLE mydb.T1 (\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ")");
+
+        tEnv.executeSql("USE CATALOG testcat");
+        assertNoPrivilege(() -> collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY 
k"));
+        assertNoPrivilege(
+                () -> tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 12), (2, 
22)").await());
+    }
+
+    @Test
+    public void testRenameObject() throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        initializeSingleUserTest(tEnv);
+
+        tEnv.executeSql("USE CATALOG rootcat");
+        tEnv.executeSql("ALTER TABLE mydb.T1 RENAME TO mydb.T2");
+
+        tEnv.executeSql("USE CATALOG testcat");
+        assertThat(collect(tEnv, "SELECT * FROM mydb.T2 ORDER BY k"))
+                .isEqualTo(Arrays.asList(Row.of(1, 11), Row.of(2, 21), 
Row.of(3, 30)));
+        tEnv.executeSql("INSERT INTO mydb.T2 VALUES (1, 12), (2, 22)").await();
+        assertThat(collect(tEnv, "SELECT * FROM mydb.T2 ORDER BY k"))
+                .isEqualTo(Arrays.asList(Row.of(1, 12), Row.of(2, 22), 
Row.of(3, 30)));
+    }
+
+    private void initializeSingleUserTest(TableEnvironment tEnv) throws 
Exception {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG mycat WITH (\n"
+                                + "  'type' = 'paimon',\n"
+                                + "  'warehouse' = '%s'\n"
+                                + ")",
+                        path));
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql("CREATE DATABASE mydb");
+        tEnv.executeSql(
+                "CREATE TABLE mydb.T1 (\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ")");
+        tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 10), (2, 20), (3, 
30)").await();
+        tEnv.executeSql("CALL sys.init_file_based_privilege('root-passwd')");
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG rootcat WITH (\n"
+                                + "  'type' = 'paimon',\n"
+                                + "  'warehouse' = '%s',\n"
+                                + "  'user' = 'root',\n"
+                                + "  'password' = 'root-passwd'\n"
+                                + ")",
+                        path));
+        tEnv.executeSql("USE CATALOG rootcat");
+        tEnv.executeSql("CALL sys.create_privileged_user('test', 
'test-passwd')");
+        tEnv.executeSql("CALL sys.grant_privilege_to_user('test', 'SELECT', 
'mydb', 'T1')");
+        tEnv.executeSql("CALL sys.grant_privilege_to_user('test', 'INSERT', 
'mydb', 'T1')");
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG testcat WITH (\n"
+                                + "  'type' = 'paimon',\n"
+                                + "  'warehouse' = '%s',\n"
+                                + "  'user' = 'test',\n"
+                                + "  'password' = 'test-passwd'\n"
+                                + ")",
+                        path));
+        tEnv.executeSql("USE CATALOG testcat");
+        assertThat(collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY k"))
+                .isEqualTo(Arrays.asList(Row.of(1, 10), Row.of(2, 20), 
Row.of(3, 30)));
+        tEnv.executeSql("INSERT INTO mydb.T1 VALUES (1, 11), (2, 21)").await();
+        assertThat(collect(tEnv, "SELECT * FROM mydb.T1 ORDER BY k"))
+                .isEqualTo(Arrays.asList(Row.of(1, 11), Row.of(2, 21), 
Row.of(3, 30)));
+    }
+
+    private List<Row> collect(TableEnvironment tEnv, String sql) throws 
Exception {
+        List<Row> result = new ArrayList<>();
+        try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+            while (it.hasNext()) {
+                result.add(it.next());
+            }
+        }
+        return result;
+    }
+
+    private void assertNoPrivilege(Executable executable) {
+        Exception e = assertThrows(Exception.class, executable);
+        if (e.getCause() != null) {
+            assertThat(e).hasRootCauseInstanceOf(NoPrivilegeException.class);
+        } else {
+            assertThat(e).isInstanceOf(NoPrivilegeException.class);
+        }
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 32d25e7db..06dbc5a2b 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -33,6 +33,9 @@ import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.options.OptionsUtils;
+import org.apache.paimon.privilege.FileBasedPrivilegeManager;
+import org.apache.paimon.privilege.PrivilegeManager;
+import org.apache.paimon.privilege.PrivilegedCatalog;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -684,12 +687,26 @@ public class HiveCatalog extends AbstractCatalog {
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
-        return new HiveCatalog(
-                fileIO,
-                hiveConf,
-                options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
-                options,
-                warehouse.toUri().toString());
+
+        Catalog catalog =
+                new HiveCatalog(
+                        fileIO,
+                        hiveConf,
+                        options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
+                        options,
+                        warehouse.toUri().toString());
+
+        PrivilegeManager privilegeManager =
+                new FileBasedPrivilegeManager(
+                        warehouse.toString(),
+                        fileIO,
+                        context.options().get(PrivilegedCatalog.USER),
+                        context.options().get(PrivilegedCatalog.PASSWORD));
+        if (privilegeManager.privilegeEnabled()) {
+            catalog = new PrivilegedCatalog(catalog, privilegeManager);
+        }
+
+        return catalog;
     }
 
     public static HiveConf createHiveConf(CatalogContext context) {
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
index cc51df265..95da00371 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java
@@ -24,16 +24,11 @@ import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
 
 /** Factory to create {@link HiveCatalog}. */
 public class HiveCatalogFactory implements CatalogFactory {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalogFactory.class);
-
     public static final ConfigOption<String> METASTORE_CLIENT_CLASS =
             ConfigOptions.key("metastore.client.class")
                     .stringType()
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 4d9753bab..a5c494c1e 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.hive.annotation.Minio;
 import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
+import org.apache.paimon.privilege.NoPrivilegeException;
 import org.apache.paimon.s3.MinioTestContainer;
 
 import com.klarna.hiverunner.HiveShell;
@@ -43,6 +44,7 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.function.Executable;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
@@ -67,6 +69,7 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** IT cases for using Paimon {@link HiveCatalog} together with Paimon Hive 
connector. */
 @RunWith(PaimonEmbeddedHiveRunner.class)
@@ -76,6 +79,7 @@ public abstract class HiveCatalogITCaseBase {
 
     protected String path;
     protected TableEnvironment tEnv;
+    private boolean locationInProperties;
 
     @HiveSQL(files = {})
     protected static HiveShell hiveShell;
@@ -83,26 +87,41 @@ public abstract class HiveCatalogITCaseBase {
     @Minio private static MinioTestContainer minioTestContainer;
 
     private void before(boolean locationInProperties) throws Exception {
-        Map<String, String> catalogProperties = new HashMap<>();
+        this.locationInProperties = locationInProperties;
+        if (locationInProperties) {
+            path = minioTestContainer.getS3UriForDefaultBucket() + "/" + 
UUID.randomUUID();
+        } else {
+            path = folder.newFolder().toURI().toString();
+        }
+        registerHiveCatalog("my_hive", new HashMap<>());
+
+        tEnv.executeSql("USE CATALOG my_hive").await();
+        tEnv.executeSql("DROP DATABASE IF EXISTS test_db CASCADE");
+        tEnv.executeSql("CREATE DATABASE test_db").await();
+        tEnv.executeSql("USE test_db").await();
+        hiveShell.execute("USE test_db");
+        hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )");
+        hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200, 
'Table')");
+    }
+
+    private void registerHiveCatalog(String catalogName, Map<String, String> 
catalogProperties)
+            throws Exception {
         catalogProperties.put("type", "paimon");
         catalogProperties.put("metastore", "hive");
         catalogProperties.put("uri", "");
         catalogProperties.put("lock.enabled", "true");
         catalogProperties.put("location-in-properties", 
String.valueOf(locationInProperties));
+        catalogProperties.put("warehouse", path);
         if (locationInProperties) {
-            path = minioTestContainer.getS3UriForDefaultBucket() + "/" + 
UUID.randomUUID();
             catalogProperties.putAll(minioTestContainer.getS3ConfigOptions());
-        } else {
-            path = folder.newFolder().toURI().toString();
         }
-        catalogProperties.put("warehouse", path);
 
         EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
         tEnv = TableEnvironmentImpl.create(settings);
         tEnv.executeSql(
                         String.join(
                                 "\n",
-                                "CREATE CATALOG my_hive WITH (",
+                                "CREATE CATALOG " + catalogName + " WITH (",
                                 catalogProperties.entrySet().stream()
                                         .map(
                                                 e ->
@@ -112,13 +131,6 @@ public abstract class HiveCatalogITCaseBase {
                                         .collect(Collectors.joining(",\n")),
                                 ")"))
                 .await();
-        tEnv.executeSql("USE CATALOG my_hive").await();
-        tEnv.executeSql("DROP DATABASE IF EXISTS test_db CASCADE");
-        tEnv.executeSql("CREATE DATABASE test_db").await();
-        tEnv.executeSql("USE test_db").await();
-        hiveShell.execute("USE test_db");
-        hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )");
-        hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200, 
'Table')");
     }
 
     private void after() {
@@ -1034,6 +1046,41 @@ public abstract class HiveCatalogITCaseBase {
         }
     }
 
+    @Test
+    public void testFileBasedPrivilege() throws Exception {
+        tEnv.executeSql("CREATE TABLE t ( a INT, b INT )");
+        tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
+        tEnv.executeSql("CALL sys.init_file_based_privilege('root-passwd')");
+
+        Map<String, String> rootCatalogProperties = new HashMap<>();
+        rootCatalogProperties.put("user", "root");
+        rootCatalogProperties.put("password", "root-passwd");
+        registerHiveCatalog("my_hive_root", rootCatalogProperties);
+        tEnv.executeSql("USE CATALOG my_hive_root");
+        tEnv.executeSql("CALL sys.create_privileged_user('test', 
'test-passwd')");
+        tEnv.executeSql("CALL sys.grant_privilege_to_user('test', 'SELECT', 
'test_db')");
+
+        Map<String, String> testCatalogProperties = new HashMap<>();
+        testCatalogProperties.put("user", "test");
+        testCatalogProperties.put("password", "test-passwd");
+        registerHiveCatalog("my_hive_test", testCatalogProperties);
+        tEnv.executeSql("USE CATALOG my_hive_test");
+        tEnv.executeSql("USE test_db");
+        assertThat(collect("SELECT * FROM t ORDER BY a"))
+                .containsExactly(Row.of(1, 10), Row.of(2, 20));
+        assertNoPrivilege(() -> tEnv.executeSql("INSERT INTO t VALUES (3, 
30)").await());
+        assertNoPrivilege(() -> tEnv.executeSql("DROP TABLE t").await());
+    }
+
+    private void assertNoPrivilege(Executable executable) {
+        Exception e = assertThrows(Exception.class, executable);
+        if (e.getCause() != null) {
+            assertThat(e).hasRootCauseInstanceOf(NoPrivilegeException.class);
+        } else {
+            assertThat(e).isInstanceOf(NoPrivilegeException.class);
+        }
+    }
+
     protected List<Row> collect(String sql) throws Exception {
         List<Row> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {


Reply via email to