This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 7acc24d74c  [#9377]  feat(python-client): Add Role-related Interface. 
(#9378)
7acc24d74c is described below

commit 7acc24d74cff700b839dfb4a3dd31c56954b680f
Author: Lord of Abyss <[email protected]>
AuthorDate: Wed Feb 4 20:44:38 2026 +0800

     [#9377]  feat(python-client): Add Role-related Interface. (#9378)
    
    ### What changes were proposed in this pull request?
    
    Add Role-related Interface.
    
    ### Why are the changes needed?
    
    Fix: #9377
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    local test.
---
 .../gravitino/api/authorization/__init__.py        |  28 ++
 .../gravitino/api/authorization/privileges.py      | 223 +++++++++++
 .../gravitino/api/authorization/role.py            |  70 ++++
 .../api/authorization/securable_objects.py         | 429 +++++++++++++++++++++
 .../gravitino/api/authorization/supports_roles.py  |  39 ++
 .../client-python/gravitino/api/metadata_object.py |  12 +
 .../gravitino/api/metadata_objects.py              |   4 +
 .../tests/unittests/authorization/__init__.py      |  16 +
 .../authorization/test_securable_objects.py        | 385 ++++++++++++++++++
 9 files changed, 1206 insertions(+)

diff --git a/clients/client-python/gravitino/api/authorization/__init__.py 
b/clients/client-python/gravitino/api/authorization/__init__.py
new file mode 100644
index 0000000000..a593b8d584
--- /dev/null
+++ b/clients/client-python/gravitino/api/authorization/__init__.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from gravitino.api.authorization.privileges import Privileges
+from gravitino.api.authorization.role import Role
+from gravitino.api.authorization.securable_objects import SecurableObjects
+
+__all__ = [
+    "Role",
+    "SecurableObjects",
+    "Privileges",
+]
diff --git a/clients/client-python/gravitino/api/authorization/privileges.py 
b/clients/client-python/gravitino/api/authorization/privileges.py
new file mode 100644
index 0000000000..36e57d8407
--- /dev/null
+++ b/clients/client-python/gravitino/api/authorization/privileges.py
@@ -0,0 +1,223 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from enum import Enum
+from typing import TYPE_CHECKING
+
+from gravitino.api.metadata_object import MetadataObject
+
+if TYPE_CHECKING:
+    pass
+
+
+class Privilege(ABC):
+    """The interface of a privilege. The privilege represents the ability to 
execute
+    kinds of operations  for kinds of entities
+    """
+
+    @abstractmethod
+    def name(self) -> Privilege.Name:
+        """
+        Return the generic name of the privilege.
+
+        Raises:
+            NotImplementedError: If the method is not implemented.
+
+        Returns:
+            Privilege.Name: The generic name of the privilege.
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def simple_string(self) -> str:
+        """
+        Return a simple string representation of the privilege.
+
+        Raises:
+            NotImplementedError: If the method is not implemented.
+
+        Returns:
+            str: A readable string representation for the privilege.
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def condition(self) -> "Privilege.Condition":
+        """
+        Return the condition of the privilege.
+
+        raises:
+            NotImplementedError: If the method is not implemented.
+
+        Returns:
+            Privilege.Condition: The condition of the privilege.
+            `ALLOW` means that you are allowed to use the  privilege,
+            `DENY` means that you are denied to use the privilege
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def can_bind_to(self, obj_type: MetadataObject.Type) -> bool:
+        """
+        Check whether this privilege can bind to a securable object type.
+
+        Args:
+            obj_type: The securable object's metadata type.
+
+        Returns:
+            True if this privilege can bind to the given type, otherwise False.
+        """
+        raise NotImplementedError()
+
+    class Name(Enum):
+        """The name of this privilege."""
+
+        CREATE_CATALOG = (0, 1 << 0)
+        """The privilege to create a catalog."""
+
+        USE_CATALOG = (0, 1 << 2)
+        """The privilege to use a catalog."""
+
+        CREATE_SCHEMA = (0, 1 << 3)
+        """The privilege to create a schema."""
+
+        USE_SCHEMA = (0, 1 << 4)
+        """The privilege to use a schema."""
+
+        CREATE_TABLE = (0, 1 << 5)
+        """The privilege to create a table."""
+
+        MODIFY_TABLE = (0, 1 << 6)
+        """The privilege to modify a table."""
+
+        SELECT_TABLE = (0, 1 << 7)
+        """The privilege to select data from a table."""
+
+        CREATE_FILESET = (0, 1 << 8)
+        """The privilege to create a fileset."""
+
+        WRITE_FILESET = (0, 1 << 9)
+        """The privilege to write a fileset."""
+
+        READ_FILESET = (0, 1 << 10)
+        """The privilege to read a fileset."""
+
+        CREATE_TOPIC = (0, 1 << 11)
+        """The privilege to create a topic."""
+
+        PRODUCE_TOPIC = (0, 1 << 12)
+        """The privilege to produce to a topic."""
+
+        CONSUME_TOPIC = (0, 1 << 13)
+        """The privilege to consume from a topic."""
+
+        MANAGE_USERS = (0, 1 << 14)
+        """The privilege to manage users."""
+
+        MANAGE_GROUPS = (0, 1 << 15)
+        """The privilege to manage groups."""
+
+        CREATE_ROLE = (0, 1 << 16)
+        """The privilege to create a role."""
+
+        MANAGE_GRANTS = (0, 1 << 17)
+        """The privilege to grant or revoke a role for the user or the 
group."""
+
+        REGISTER_MODEL = (0, 1 << 18)
+        """The privilege to create a model."""
+
+        LINK_MODEL_VERSION = (0, 1 << 19)
+        """The privilege to create a model version."""
+
+        USE_MODEL = (0, 1 << 20)
+        """The privilege to view model metadata and download all model 
versions."""
+
+        CREATE_TAG = (0, 1 << 21)
+        """The privilege to create a tag."""
+
+        APPLY_TAG = (0, 1 << 22)
+        """The privilege to apply a tag."""
+
+        CREATE_POLICY = (0, 1 << 23)
+        """The privilege to create a policy."""
+
+        APPLY_POLICY = (0, 1 << 24)
+        """The privilege to apply a policy."""
+
+        REGISTER_JOB_TEMPLATE = (0, 1 << 25)
+        """The privilege to register a job template."""
+
+        USE_JOB_TEMPLATE = (0, 1 << 26)
+        """The privilege to use a job template."""
+
+        RUN_JOB = (0, 1 << 27)
+        """The privilege to run a job."""
+
+        def __init__(self, high_bits: int, low_bits: int) -> None:
+            """
+            Initialize the Name with the high and low bits.
+
+            Args:
+                high_bits (int): The high bits of the privilege.
+                low_bits (int): The low bits of the privilege.
+            """
+            self._high_bits = high_bits
+            self._low_bits = low_bits
+
+        @property
+        def low_bits(self) -> int:
+            """
+            Return the low bits of Name.
+
+            Returns:
+                int: The low bits of Name
+            """
+
+            return self._low_bits
+
+        @property
+        def high_bits(self) -> int:
+            """
+            Return the high bits of Name.
+
+            Returns:
+                int: The high bits of Name
+            """
+            return self._high_bits
+
+    class Condition(Enum):
+        """
+        The condition of this privilege.
+
+        ALLOW means that you are allowed to use the privilege.
+        DENY means that you are denied to use the privilege.
+
+        If you have ALLOW and DENY for the same privilege name of the same
+        securable object, the DENY will take effect.
+        """
+
+        # Allow to use the privilege
+        ALLOW = "ALLOW"
+        # Deny to use the privilege
+        DENY = "DENY"
+
+
+class Privileges:
+    # TODO Implement the Privileges class.
+    pass
diff --git a/clients/client-python/gravitino/api/authorization/role.py 
b/clients/client-python/gravitino/api/authorization/role.py
new file mode 100644
index 0000000000..ffdb5f3689
--- /dev/null
+++ b/clients/client-python/gravitino/api/authorization/role.py
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from abc import abstractmethod
+
+from gravitino.api.auditable import Auditable
+from gravitino.api.authorization.securable_objects import SecurableObject
+
+
+class Role(Auditable):
+    """The interface of a role. The role is the entity which has kinds of 
privileges. One role can have
+    multiple privileges of multiple securable objects.
+    """
+
+    @abstractmethod
+    def name(self) -> str:
+        """
+        The name of the role.
+
+        Raises:
+            NotImplementedError: if the method is not implemented.
+
+        Returns:
+            str: The name of the role.
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def properties(self) -> dict[str, str]:
+        """
+        The properties of the role. Note, this method will return None if the 
properties are not set.
+
+        Raises:
+            NotImplementedError: if the method is not implemented.
+
+        Returns:
+            dict[str, str]: The properties of the role.
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def securable_objects(self) -> list[SecurableObject]:
+        """
+        The securable object represents a special kind of entity with a unique 
identifier. All
+        securable objects are organized by tree structure. For example: If the 
securable object is a
+        table, the identifier may be `catalog1.schema1.table1`._summary_
+
+        Raises:
+            NotImplementedError: if the method is not implemented.
+
+        Returns:
+            list[SecurableObject]: The securable objects of the role.
+        """
+        raise NotImplementedError()
diff --git 
a/clients/client-python/gravitino/api/authorization/securable_objects.py 
b/clients/client-python/gravitino/api/authorization/securable_objects.py
new file mode 100644
index 0000000000..491d637d5a
--- /dev/null
+++ b/clients/client-python/gravitino/api/authorization/securable_objects.py
@@ -0,0 +1,429 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from collections import Counter
+from collections.abc import Collection
+
+from gravitino.api.authorization.privileges import Privilege
+from gravitino.api.metadata_object import MetadataObject
+from gravitino.api.metadata_objects import MetadataObjects
+
+
+class SecurableObject(MetadataObject, ABC):
+    """
+    A securable object is an entity on which access control can be granted.
+    Unless explicitly granted, access is denied.
+
+    Apache Gravitino organizes securable objects in a tree structure.
+    Each securable object contains three attributes: parent, name, and type.
+
+    Supported types:
+        - CATALOG
+        - SCHEMA
+        - TABLE
+        - FILESET
+        - TOPIC
+        - METALAKE
+
+    Use the helper class `SecurableObjects` to construct the securable object 
you need.
+
+    In RESTful APIs, you can reference a securable object using its full name 
and type.
+
+    Examples
+    --------
+    Catalog:
+        - Python code:
+            SecurableObjects.catalog("catalog1")
+        - REST API:
+            full_name="catalog1", type="CATALOG"
+
+    Schema:
+        - Python code:
+            SecurableObjects.schema("catalog1", "schema1")
+        - REST API:
+            full_name="catalog1.schema1", type="SCHEMA"
+
+    Table:
+        - Python code:
+            SecurableObjects.table("catalog1", "schema1", "table1")
+        - REST API:
+            full_name="catalog1.schema1.table1", type="TABLE"
+
+    Topic:
+        - Python code:
+            SecurableObjects.topic("catalog1", "schema1", "topic1")
+        - REST API:
+            full_name="catalog1.schema1.topic1", type="TOPIC"
+
+    Fileset:
+        - Python code:
+            SecurableObjects.fileset("catalog1", "schema1", "fileset1")
+        - REST API:
+            full_name="catalog1.schema1.fileset1", type="FILESET"
+
+    Metalake:
+        - Python code:
+            SecurableObjects.metalake("metalake1")
+        - REST API:
+            full_name="metalake1", type="METALAKE"
+
+    Notes
+    -----
+    - To represent “all catalogs”, you can use the metalake as the root object.
+    - To grant a privilege on all children, you can assign it to their common 
parent.
+      For example, to grant READ TABLE on all tables under `catalog1.schema1`,
+      simply grant READ TABLE on the schema object itself.
+    """
+
+    @abstractmethod
+    def privileges(self) -> list["Privilege"]:
+        """The privileges of the securable object. For example: If the 
securable object is a table, the
+        privileges could be `READ TABLE`, `WRITE TABLE`, etc. If a schema has 
the privilege of `LOAD
+        TABLE`. It means the role can load all tables of the schema.
+
+        returns:
+            The privileges of the securable object.
+        """
+        raise NotImplementedError()
+
+
+class SecurableObjects:
+    """The helper class for SecurableObject."""
+
+    class SecurableObjectImpl(MetadataObjects.MetadataObjectImpl, 
SecurableObject):
+        def __init__(
+            self,
+            parent: str,
+            name: str,
+            type_: MetadataObject.Type,
+            privileges: list[Privilege],
+        ) -> None:
+            super().__init__(name, type_, parent)
+            self._privileges = list(set(privileges))
+
+        @staticmethod
+        def is_equal_collection(c1: Collection, c2: Collection) -> bool:
+            if c1 is c2:
+                return True
+            if c1 is None or c2 is None:
+                return False
+
+            return Counter(c1) == Counter(c2)
+
+        def __hash__(self) -> int:
+            return hash((super().__hash__(), self._privileges))
+
+        def __str__(self) -> str:
+            privileges_str = ",".join(
+                f"[{p.simple_string()}]" for p in self._privileges
+            )
+
+            return (
+                f"SecurableObject: [fullName={self.full_name()}], "
+                f"[type={self.type()}], [privileges={privileges_str}]"
+            )
+
+        def __eq__(self, other: object) -> bool:
+            if other is self:
+                return True
+            if not isinstance(other, SecurableObject):
+                return False
+
+            return super().__eq__(
+                other
+            ) and SecurableObjects.SecurableObjectImpl.is_equal_collection(
+                self.privileges(), other.privileges()
+            )
+
+        def privileges(self) -> list[Privilege]:
+            return self._privileges
+
+    @staticmethod
+    def of_metalake(
+        metalake: str,
+        privileges: list[Privilege],
+    ) -> SecurableObjectImpl:
+        """
+        Create the metalake SecurableObject with the given metalake name and 
privileges.
+
+        Args:
+            metalake (str): The metalake name.
+            privileges (list[Privilege]): The privileges of the metalake.
+
+        Returns:
+            SecurableObjectImpl: The created metalake SecurableObject.
+        """
+        return SecurableObjects.of(
+            MetadataObject.Type.METALAKE,
+            [metalake],
+            privileges,
+        )
+
+    @staticmethod
+    def of_catalog(
+        catalog: str,
+        privileges: list[Privilege],
+    ) -> SecurableObjectImpl:
+        """
+        Create the catalog SecurableObject with the given catalog name and 
privileges.
+
+        Args:
+            catalog (str): The catalog name
+            privileges (list[Privilege]): The privileges of the catalog.
+
+        Returns:
+            SecurableObjectImpl: The created catalog SecurableObject.
+        """
+        return SecurableObjects.of(
+            MetadataObject.Type.CATALOG,
+            [catalog],
+            privileges,
+        )
+
+    @staticmethod
+    def of_schema(
+        catalog: SecurableObject,
+        schema: str,
+        privileges: list[Privilege],
+    ) -> SecurableObjectImpl:
+        """
+        Create the schema SecurableObject with the given securable catalog 
object, schema name and privileges.
+
+        Args:
+            catalog (SecurableObject): The catalog securable object.
+            schema (str): The schema name.
+            privileges (list[Privilege]): The privileges of the schema.
+
+        Returns:
+            SecurableObjectImpl: The created schema SecurableObject.
+        """
+        return SecurableObjects.of(
+            MetadataObject.Type.SCHEMA,
+            [catalog.full_name(), schema],
+            privileges,
+        )
+
+    @staticmethod
+    def of_table(
+        schema: SecurableObject,
+        table: str,
+        privileges: list[Privilege],
+    ) -> SecurableObjectImpl:
+        """
+        Create the table SecurableObject with the given securable schema 
object, table name and privileges.
+
+        Args:
+            schema (SecurableObject): The schema securable object.
+            table (str): The table name.
+            privileges (list[Privilege]): The privileges of the table.
+
+        Returns:
+            SecurableObjectImpl: The created table SecurableObject.
+        """
+        names = [*schema.full_name().split("."), table]
+        return SecurableObjects.of(
+            MetadataObject.Type.TABLE,
+            names,
+            privileges,
+        )
+
+    @staticmethod
+    def of_topic(
+        schema: SecurableObject,
+        topic: str,
+        privileges: list[Privilege],
+    ) -> SecurableObjectImpl:
+        """
+        Create the topic SecurableObject with the given securable schema 
object ,topic name and privileges.
+
+        Args:
+            schema (SecurableObject): The schema securable object
+            topic (str): The topic name
+            privileges (list[Privilege]): The privileges of the topic
+
+        Returns:
+            SecurableObjectImpl: The created topic SecurableObject
+        """
+        names = [*schema.full_name().split("."), topic]
+        return SecurableObjects.of(
+            MetadataObject.Type.TOPIC,
+            names,
+            privileges,
+        )
+
+    @staticmethod
+    def of_fileset(
+        schema: SecurableObject,
+        fileset: str,
+        privileges: list[Privilege],
+    ) -> SecurableObjectImpl:
+        """
+        Create the model SecurableObject with the given securable schema 
object, fileset name and
+
+        Args:
+            schema (SecurableObject): The schema securable object
+            fileset (str): The fileset name
+            privileges (list[Privilege]): The privileges of the fileset
+
+        Returns:
+            SecurableObjectImpl: The created fileset SecurableObject.
+        """
+        names = [*schema.full_name().split("."), fileset]
+        return SecurableObjects.of(
+            MetadataObject.Type.FILESET,
+            names,
+            privileges,
+        )
+
+    @staticmethod
+    def of_model(
+        schema: SecurableObject,
+        model: str,
+        privileges: list[Privilege],
+    ) -> SecurableObjectImpl:
+        """
+        Create the model SecurableObject with the given securable schema 
object, model name and
+
+        Args:
+            schema (SecurableObject): The schema securable object.
+            model (str): The model name.
+            privileges (list[Privilege]): The privileges of the fileset.
+
+        Returns:
+            SecurableObjectImpl: The created model SecurableObject.
+        """
+        names = [*schema.full_name().split("."), model]
+        return SecurableObjects.of(
+            MetadataObject.Type.MODEL,
+            names,
+            privileges,
+        )
+
+    @staticmethod
+    def of_tag(
+        tag_name: str,
+        privileges: list[Privilege],
+    ) -> SecurableObjectImpl:
+        """
+        Create the tag SecurableObject with the given tag name and privileges.
+
+        Args:
+            tag_name (str): The tag name
+            privileges (list[Privilege]): The privileges of the tag
+
+        Returns:
+            SecurableObjectImpl: The created tag SecurableObject.
+        """
+        return SecurableObjects.of(
+            MetadataObject.Type.TAG,
+            [tag_name],
+            privileges,
+        )
+
+    @staticmethod
+    def of_policy(
+        policy: str,
+        privileges: list[Privilege],
+    ) -> SecurableObjectImpl:
+        """
+        Create the policy SecurableObject with the given policy name and 
privileges.
+
+        Args:
+            policy (str): The policy name
+            privileges (list[Privilege]): The privileges of the policy
+
+        Returns:
+            SecurableObjectImpl: _descriThe created policy SecurableObject.
+        """
+        return SecurableObjects.of(
+            MetadataObject.Type.POLICY,
+            [policy],
+            privileges,
+        )
+
+    @staticmethod
+    def of_job_template(
+        job_template: str,
+        privileges: list[Privilege],
+    ) -> SecurableObjectImpl:
+        """
+        Create the job template SecurableObject with the given job template 
name and privileges.
+
+        Args:
+            job_template (str): The job template name
+            privileges (list[Privilege]): The privileges of the job template
+
+        Returns:
+            SecurableObjectImpl: The created job template SecurableObject.
+        """
+        return SecurableObjects.of(
+            MetadataObject.Type.JOB_TEMPLATE,
+            [job_template],
+            privileges,
+        )
+
+    @staticmethod
+    def parse(
+        full_name: str,
+        type_: MetadataObject.Type,
+        privileges: list[Privilege],
+    ) -> SecurableObject:
+        """
+        Create a SecurableObject from the given full name.
+
+        Args:
+            full_name (str): The full name of securable object.
+            type_ (MetadataObject.Type): The securable object type.
+            privileges (list[Privilege]): The securable object privileges.
+
+        Returns:
+            SecurableObject: The created SecurableObject.
+        """
+        metadata_object = MetadataObjects.parse(full_name, type_)
+        return SecurableObjects.SecurableObjectImpl(
+            metadata_object.parent(),
+            metadata_object.name(),
+            type_,
+            privileges,
+        )
+
+    @staticmethod
+    def of(
+        type_: MetadataObject.Type,
+        names: list[str],
+        privileges: list[Privilege],
+    ) -> SecurableObject:
+        """
+        Create the SecurableObject with the given names.
+
+        Args:
+            type_ (MetadataObject.Type): The securable object type.
+            names (list[str]): The names of the securable object.
+            privileges (list[Privilege]): The securable object privileges.
+
+        Returns:
+            SecurableObject: he created SecurableObject
+        """
+        metadata_object = MetadataObjects.of(names, type_)
+        return SecurableObjects.SecurableObjectImpl(
+            metadata_object.parent(),
+            metadata_object.name(),
+            type_,
+            privileges,
+        )
diff --git 
a/clients/client-python/gravitino/api/authorization/supports_roles.py 
b/clients/client-python/gravitino/api/authorization/supports_roles.py
new file mode 100644
index 0000000000..02a81bfc8f
--- /dev/null
+++ b/clients/client-python/gravitino/api/authorization/supports_roles.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+
+
+class SupportsRoles(ABC):
+    """Interface for supporting list role names for objects. This interface 
will be mixed with metadata
+    objects to provide listing role operations.
+    """
+
+    @abstractmethod
+    def list_binding_role_names(self) -> list[str]:
+        """
+        List all the role names associated with this metadata object.
+
+        Raises:
+            NotImplementedError: If the method is not implemented.
+
+        Returns:
+            list[str]: The role name list associated with this metadata object.
+        """
+        raise NotImplementedError()
diff --git a/clients/client-python/gravitino/api/metadata_object.py 
b/clients/client-python/gravitino/api/metadata_object.py
index 20a498c0ad..2c492bb42e 100644
--- a/clients/client-python/gravitino/api/metadata_object.py
+++ b/clients/client-python/gravitino/api/metadata_object.py
@@ -72,6 +72,18 @@ class MetadataObject(ABC):
         MODEL = "model"
         """A model is mapped to the model artifact in ML."""
 
+        TAG = "tag"
+        """A tag is used to help manage other metadata object."""
+
+        POLICY = "policy"
+        """A policy can be associated with a metadata object for data 
governance and similar purposes."""
+
+        JOB = "job"
+        """A job represents a data processing task in Gravitino."""
+
+        JOB_TEMPLATE = "job_template"
+        """A job template represents a reusable template for creating jobs in 
Gravitino."""
+
     @abstractmethod
     def parent(self) -> Optional[str]:
         """The parent full name of the object.
diff --git a/clients/client-python/gravitino/api/metadata_objects.py 
b/clients/client-python/gravitino/api/metadata_objects.py
index 8a29c612e1..7d7b4babfe 100644
--- a/clients/client-python/gravitino/api/metadata_objects.py
+++ b/clients/client-python/gravitino/api/metadata_objects.py
@@ -36,6 +36,10 @@ class MetadataObjects:
             MetadataObject.Type.METALAKE,
             MetadataObject.Type.CATALOG,
             MetadataObject.Type.ROLE,
+            MetadataObject.Type.TAG,
+            MetadataObject.Type.POLICY,
+            MetadataObject.Type.JOB,
+            MetadataObject.Type.JOB_TEMPLATE,
         },
         2: {MetadataObject.Type.SCHEMA},
         3: {
diff --git a/clients/client-python/tests/unittests/authorization/__init__.py 
b/clients/client-python/tests/unittests/authorization/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/clients/client-python/tests/unittests/authorization/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/clients/client-python/tests/unittests/authorization/test_securable_objects.py 
b/clients/client-python/tests/unittests/authorization/test_securable_objects.py
new file mode 100644
index 0000000000..35a1963b5c
--- /dev/null
+++ 
b/clients/client-python/tests/unittests/authorization/test_securable_objects.py
@@ -0,0 +1,385 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from __future__ import annotations
+
+import unittest
+
+from gravitino.api.authorization.privileges import Privilege
+from gravitino.api.authorization.securable_objects import SecurableObjects
+from gravitino.api.metadata_object import MetadataObject
+
+
+class MockPrivilege(Privilege):
+    def __init__(self, name: Privilege.Name, condition: Privilege.Condition) 
-> None:
+        self._name = name
+        self._condition = condition
+
+    def name(self) -> Privilege.Name:
+        return self._name
+
+    def simple_string(self) -> str:
+        return f"{self._condition.value.lower()}_{self._name.name.lower()}"
+
+    def condition(self) -> Privilege.Condition:
+        return self._condition
+
+    def can_bind_to(self, obj_type: MetadataObject.Type) -> bool:
+        # mock: allow everything
+        return True
+
+    def __repr__(self) -> str:
+        return f"MockPrivilege(name={self._name}, condition={self._condition})"
+
+
+class TestSecurableObject(unittest.TestCase):
+    def test_metalake_object(self) -> None:
+        # metalake object
+        metalake_privilege = MockPrivilege(
+            Privilege.Name.CREATE_CATALOG, Privilege.Condition.ALLOW
+        )
+        metalake_object = SecurableObjects.of_metalake("metalake", 
[metalake_privilege])
+        another_metalake_object = SecurableObjects.of(
+            MetadataObject.Type.METALAKE,
+            ["metalake"],
+            [metalake_privilege],
+        )
+
+        self.assertEqual("metalake", metalake_object.full_name())
+        self.assertEqual(MetadataObject.Type.METALAKE, metalake_object.type())
+        self.assertEqual(metalake_object, another_metalake_object)
+
+    def test_catalog_object(self) -> None:
+        # Catalog object
+        catalog_privilege = MockPrivilege(
+            Privilege.Name.USE_CATALOG,
+            Privilege.Condition.ALLOW,
+        )
+        catalog_object = SecurableObjects.of_catalog(
+            "catalog",
+            [catalog_privilege],
+        )
+        another_catalog_object = SecurableObjects.of(
+            MetadataObject.Type.CATALOG,
+            ["catalog"],
+            [catalog_privilege],
+        )
+
+        self.assertEqual("catalog", catalog_object.full_name())
+        self.assertEqual(MetadataObject.Type.CATALOG, catalog_object.type())
+        self.assertEqual(catalog_object, another_catalog_object)
+
+    def test_schema_object(self) -> None:
+        catalog_object = SecurableObjects.of_catalog(
+            "catalog",
+            [
+                MockPrivilege(
+                    Privilege.Name.USE_CATALOG,
+                    Privilege.Condition.ALLOW,
+                )
+            ],
+        )
+        # schema object
+        schema_privilege = MockPrivilege(
+            Privilege.Name.USE_SCHEMA,
+            Privilege.Condition.ALLOW,
+        )
+        schema_object = SecurableObjects.of_schema(
+            catalog_object,
+            "schema",
+            [schema_privilege],
+        )
+        another_schema_object = SecurableObjects.of(
+            MetadataObject.Type.SCHEMA,
+            ["catalog", "schema"],
+            [schema_privilege],
+        )
+
+        self.assertEqual("catalog.schema", schema_object.full_name())
+        self.assertEqual(MetadataObject.Type.SCHEMA, schema_object.type())
+        self.assertEqual(schema_object, another_schema_object)
+
+    def test_table_object(self) -> None:
+        catalog_object = SecurableObjects.of_catalog(
+            "catalog",
+            [
+                MockPrivilege(
+                    Privilege.Name.USE_CATALOG,
+                    Privilege.Condition.ALLOW,
+                )
+            ],
+        )
+
+        schema_object = SecurableObjects.of_schema(
+            catalog_object,
+            "schema",
+            [
+                MockPrivilege(
+                    Privilege.Name.USE_SCHEMA,
+                    Privilege.Condition.ALLOW,
+                )
+            ],
+        )
+
+        table_privilege = MockPrivilege(
+            Privilege.Name.SELECT_TABLE,
+            Privilege.Condition.ALLOW,
+        )
+
+        table_object = SecurableObjects.of_table(
+            schema_object,
+            "table",
+            [table_privilege],
+        )
+        another_table_object = SecurableObjects.of(
+            MetadataObject.Type.TABLE,
+            ["catalog", "schema", "table"],
+            [table_privilege],
+        )
+
+        self.assertEqual("catalog.schema.table", table_object.full_name())
+        self.assertEqual(MetadataObject.Type.TABLE, table_object.type())
+        self.assertEqual(table_object, another_table_object)
+
+    def test_fileset_object(self) -> None:
+        catalog_object = SecurableObjects.of_catalog(
+            "catalog",
+            [
+                MockPrivilege(
+                    Privilege.Name.USE_CATALOG,
+                    Privilege.Condition.ALLOW,
+                )
+            ],
+        )
+        schema_object = SecurableObjects.of_schema(
+            catalog_object,
+            "schema",
+            [
+                MockPrivilege(
+                    Privilege.Name.USE_SCHEMA,
+                    Privilege.Condition.ALLOW,
+                )
+            ],
+        )
+
+        # fileset object
+        fileset_privilege = MockPrivilege(
+            Privilege.Name.USE_SCHEMA,
+            Privilege.Condition.ALLOW,
+        )
+        fileset_object = SecurableObjects.of_fileset(
+            schema_object,
+            "fileset",
+            [fileset_privilege],
+        )
+
+        another_fileset_object = SecurableObjects.of(
+            MetadataObject.Type.FILESET,
+            ["catalog", "schema", "fileset"],
+            [fileset_privilege],
+        )
+
+        self.assertEqual("catalog.schema.fileset", fileset_object.full_name())
+        self.assertEqual(MetadataObject.Type.FILESET, fileset_object.type())
+        self.assertEqual(fileset_object, another_fileset_object)
+
+    def test_topic_object(self) -> None:
+        catalog_object = SecurableObjects.of_catalog(
+            "catalog",
+            [
+                MockPrivilege(
+                    Privilege.Name.USE_CATALOG,
+                    Privilege.Condition.ALLOW,
+                )
+            ],
+        )
+
+        schema_object = SecurableObjects.of_schema(
+            catalog_object,
+            "schema",
+            [
+                MockPrivilege(
+                    Privilege.Name.USE_SCHEMA,
+                    Privilege.Condition.ALLOW,
+                )
+            ],
+        )
+        # Topic object
+        topic_privilege = MockPrivilege(
+            Privilege.Name.CONSUME_TOPIC,
+            Privilege.Condition.ALLOW,
+        )
+        topic_object = SecurableObjects.of_topic(
+            schema_object,
+            "topic",
+            [topic_privilege],
+        )
+        another_topic_object = SecurableObjects.of(
+            MetadataObject.Type.TOPIC,
+            ["catalog", "schema", "topic"],
+            [topic_privilege],
+        )
+
+        self.assertEqual("catalog.schema.topic", topic_object.full_name())
+        self.assertEqual(MetadataObject.Type.TOPIC, topic_object.type())
+        self.assertEqual(topic_object, another_topic_object)
+
+    def test_tag_object(self) -> None:
+        tag_privilege = MockPrivilege(
+            Privilege.Name.APPLY_TAG,
+            Privilege.Condition.ALLOW,
+        )
+        tag_object = SecurableObjects.of_tag(
+            "tag",
+            [tag_privilege],
+        )
+
+        another_tag_object = SecurableObjects.of(
+            MetadataObject.Type.TAG,
+            ["tag"],
+            [tag_privilege],
+        )
+
+        self.assertEqual("tag", tag_object.full_name())
+        self.assertEqual(MetadataObject.Type.TAG, tag_object.type())
+        self.assertEqual(tag_object, another_tag_object)
+
+    def test_policy_object(self) -> None:
+        policy_privilege = MockPrivilege(
+            Privilege.Name.APPLY_POLICY,
+            Privilege.Condition.ALLOW,
+        )
+        policy_object = SecurableObjects.of_policy(
+            "policy",
+            [policy_privilege],
+        )
+
+        another_policy_object = SecurableObjects.of(
+            MetadataObject.Type.POLICY,
+            ["policy"],
+            [policy_privilege],
+        )
+
+        self.assertEqual("policy", policy_object.full_name())
+        self.assertEqual(MetadataObject.Type.POLICY, policy_object.type())
+        self.assertEqual(policy_object, another_policy_object)
+
+    def test_job_object(self) -> None:
+        job_tempalte_privilege = MockPrivilege(
+            Privilege.Name.RUN_JOB,
+            Privilege.Condition.ALLOW,
+        )
+        job_tempalte_object = SecurableObjects.of_job_template(
+            "job_template", [job_tempalte_privilege]
+        )
+
+        another_job_tempalte_object = SecurableObjects.of(
+            MetadataObject.Type.JOB_TEMPLATE,
+            ["job_template"],
+            [job_tempalte_privilege],
+        )
+
+        self.assertEqual("job_template", job_tempalte_object.full_name())
+        self.assertEqual(MetadataObject.Type.JOB_TEMPLATE, 
job_tempalte_object.type())
+        self.assertEqual(job_tempalte_object, another_job_tempalte_object)
+
+    def test_metalake_object_with_error(self) -> None:
+        metalake_privilege = MockPrivilege(
+            Privilege.Name.CREATE_CATALOG, Privilege.Condition.ALLOW
+        )
+        with self.assertRaises(ValueError) as context:
+            _ = SecurableObjects.of(
+                MetadataObject.Type.METALAKE,
+                ["metalake", "catalog"],
+                metalake_privilege,
+            )
+
+            self.assertIn("the length of names must be 1", 
str(context.exception))
+
+    def test_catalog_object_with_error(self) -> None:
+        catalog_privilege = MockPrivilege(
+            Privilege.Name.USE_CATALOG,
+            Privilege.Condition.ALLOW,
+        )
+
+        with self.assertRaises(ValueError) as context:
+            _ = SecurableObjects.of(
+                MetadataObject.Type.CATALOG,
+                ["metalake", "catalog"],
+                catalog_privilege,
+            )
+
+            self.assertIn("the length of names must be 1", 
str(context.exception))
+
+    def test_table_object_with_error(self) -> None:
+        table_privilege = MockPrivilege(
+            Privilege.Name.SELECT_TABLE,
+            Privilege.Condition.ALLOW,
+        )
+
+        with self.assertRaises(ValueError) as context:
+            _ = SecurableObjects.of(
+                MetadataObject.Type.TABLE,
+                ["catalog"],
+                table_privilege,
+            )
+
+            self.assertIn("the length of names must be 3", 
str(context.exception))
+
+    def test_topic_object_with_error(self) -> None:
+        topic_privilege = MockPrivilege(
+            Privilege.Name.CONSUME_TOPIC,
+            Privilege.Condition.ALLOW,
+        )
+
+        with self.assertRaises(ValueError) as context:
+            _ = SecurableObjects.of(
+                MetadataObject.Type.TOPIC,
+                ["metalake"],
+                topic_privilege,
+            )
+
+            self.assertIn("the length of names must be 3", 
str(context.exception))
+
+    def test_fileset_object_with_error(self) -> None:
+        fileset_privilege = MockPrivilege(
+            Privilege.Name.USE_SCHEMA,
+            Privilege.Condition.ALLOW,
+        )
+        with self.assertRaises(ValueError) as context:
+            _ = SecurableObjects.of(
+                MetadataObject.Type.FILESET,
+                ["metalake"],
+                fileset_privilege,
+            )
+
+            self.assertIn("the length of names must be 3", 
str(context.exception))
+
+    def test_schema_object_with_error(self) -> None:
+        schema_privilege = MockPrivilege(
+            Privilege.Name.USE_SCHEMA,
+            Privilege.Condition.ALLOW,
+        )
+        with self.assertRaises(ValueError) as context:
+            _ = SecurableObjects.of(
+                MetadataObject.Type.SCHEMA,
+                ["catalog", "schema", "table"],
+                schema_privilege,
+            )
+
+            self.assertIn("the length of names must be 2", 
str(context.exception))


Reply via email to