This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new ffc4c04979 [#9411] feat(client-python): add minimal viable
implementation of relational catalog (#9478)
ffc4c04979 is described below
commit ffc4c049797b3599959922dc6eb54a7fca15c023
Author: George T. C. Lai <[email protected]>
AuthorDate: Mon Dec 22 11:17:14 2025 +0800
[#9411] feat(client-python): add minimal viable implementation of
relational catalog (#9478)
### What changes were proposed in this pull request?
Add the minimal viable implementation for class RelationalCatalog. The
following changes were included in this PR.
- add class `TableCreateRequest`
- add class `TableResponse`
- add class `TableErrorHandler`
- implement `DTOConverters.to_dtos` for columns, indexes, sort orders,
and partitioning
- implement `DTOConverters.to_dto` for distribution
- add interface `TableCatalog`
- add class `RelationalCatalog`
- implement method `create_table`
- implement method `list_tables`
- implement method `load_table`
### Why are the changes needed?
We shall carry out the minimal viable implementation for class
RelationalCatalog to facilitate the integration tests for
RelationalTable.
Fix: #9411
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
---------
Signed-off-by: George T. C. Lai <[email protected]>
---
.../gravitino/api/rel/table_catalog.py | 180 +++++++++++++
.../gravitino/client/dto_converters.py | 24 +-
.../gravitino/client/relational_catalog.py | 209 ++++++++++++++++
.../gravitino/dto/requests/table_create_request.py | 133 ++++++++++
.../gravitino/dto/responses/table_response.py | 46 ++++
.../gravitino/dto/util/dto_converters.py | 194 +++++++++++++-
clients/client-python/gravitino/exceptions/base.py | 4 +
.../exceptions/handlers/table_error_handler.py | 72 ++++++
.../unittests/dto/util/test_dto_converters.py | 216 ++++++++++++++++
.../tests/unittests/test_error_handler.py | 78 ++++++
.../tests/unittests/test_relational_catalog.py | 278 +++++++++++++++++++++
.../client-python/tests/unittests/test_requests.py | 176 +++++++++++++
.../tests/unittests/test_responses.py | 138 ++++++++++
13 files changed, 1737 insertions(+), 11 deletions(-)
diff --git a/clients/client-python/gravitino/api/rel/table_catalog.py
b/clients/client-python/gravitino/api/rel/table_catalog.py
new file mode 100644
index 0000000000..e8a2ec6814
--- /dev/null
+++ b/clients/client-python/gravitino/api/rel/table_catalog.py
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from abc import ABC, abstractmethod
+from contextlib import suppress
+from typing import Optional
+
+from gravitino.api.rel.column import Column
+from gravitino.api.rel.expressions.distributions.distribution import
Distribution
+from gravitino.api.rel.expressions.sorts.sort_order import SortOrder
+from gravitino.api.rel.expressions.transforms.transform import Transform
+from gravitino.api.rel.indexes.index import Index
+from gravitino.api.rel.table import Table
+from gravitino.exceptions.base import (
+ NoSuchTableException,
+ UnsupportedOperationException,
+)
+from gravitino.name_identifier import NameIdentifier
+from gravitino.namespace import Namespace
+
+
+class TableCatalog(ABC):
+ """The `TableCatalog` interface defines the public API for managing tables
in a schema.
+
+ If the catalog implementation supports tables, it must implement this
interface.
+ """
+
+ @abstractmethod
+ def list_tables(self, namespace: Namespace) -> list[NameIdentifier]:
+ """List the tables in a namespace from the catalog.
+
+ Args:
+ namespace (Namespace): A namespace.
+
+ Returns:
+ list[NameIdentifier]: An array of table identifiers in the
namespace.
+
+ Raises:
+ NoSuchSchemaException: If the schema does not exist.
+ """
+
+ @abstractmethod
+ def load_table(self, identifier: NameIdentifier) -> Table:
+ """Load table metadata by `NameIdentifier` from the catalog.
+
+ Args:
+ identifier (NameIdentifier): A table identifier.
+
+ Returns:
+ Table: The table metadata.
+
+ Raises:
+ NoSuchTableException: If the table does not exist.
+ """
+
+ def table_exists(self, identifier: NameIdentifier) -> bool:
+ """Check if a table exists using an `NameIdentifier` from the catalog.
+
+ Args:
+ identifier (NameIdentifier): A table identifier.
+
+ Returns:
+ bool: `True` If the table exists, `False` otherwise.
+ """
+ with suppress(NoSuchTableException):
+ self.load_table(identifier)
+ return True
+ return False
+
+ @abstractmethod
+ def create_table(
+ self,
+ identifier: NameIdentifier,
+ columns: list[Column],
+ comment: Optional[str] = None,
+ properties: Optional[dict[str, str]] = None,
+ partitioning: Optional[list[Transform]] = None,
+ distribution: Optional[Distribution] = None,
+ sort_orders: Optional[list[SortOrder]] = None,
+ indexes: Optional[list[Index]] = None,
+ ) -> Table:
+ """Create a table in the catalog.
+
+ Args:
+ identifier (NameIdentifier):
+ A table identifier.
+ columns (list[Column]):
+ The columns of the new table.
+ comment (str, optional):
+ The table comment. Defaults to `None`.
+ properties (dict[str, str], optional):
+ The table properties. Defaults to `None`.
+ partitioning (Optional[list[Transform]], optional):
+ The table partitioning. Defaults to None.
+ distribution (Optional[Distribution], optional):
+ The distribution of the table. Defaults to `None`.
+ sort_orders (Optional[list[SortOrder]], optional):
+ The sort orders of the table. Defaults to `None`.
+ indexes (Optional[list[Index]], optional):
+ The table indexes. Defaults to `None`.
+
+ Raises:
+ NoSuchSchemaException:
+ If the schema does not exist.
+ TableAlreadyExistsException:
+ If the table already exists.
+
+ Returns:
+ Table:
+ The created table metadata.
+ """
+
+ @abstractmethod
+ def alter_table(self, identifier: NameIdentifier, *changes) -> Table:
+ """Alter a table in the catalog.
+
+ Args:
+ identifier (NameIdentifier): A table identifier.
+ *changes:
+ Table changes (defined in class `TableChange`) to apply to the
table.
+
+ Returns:
+ Table: The updated table metadata.
+
+ Raises:
+ NoSuchTableException:
+ If the table does not exist.
+ IllegalArgumentException:
+ If the change is rejected by the implementation.
+ """
+
+ @abstractmethod
+ def drop_table(self, identifier: NameIdentifier) -> bool:
+ """Drop a table from the catalog.
+
+ Removes both the metadata and the directory associated with the table
from the
+ file system if the table is not an external table. In case of an
external table,
+ only the associated metadata is removed.
+
+ Args:
+ identifier (NameIdentifier): A table identifier.
+
+ Returns:
+ bool: `True` if the table is dropped, `False` if the table does
not exist.
+ """
+
+ def purge_table(self, identifier: NameIdentifier) -> bool:
+ """Drop a table from the catalog and completely remove its data.
+
+ Removes both the metadata and the directory associated with the table
completely
+ and skipping trash. If the table is an external table or the catalogs
don't support
+ purge table, `UnsupportedOperationException` is thrown. If the catalog
supports to
+ purge a table, this method should be overridden. The default
implementation throws
+ an `UnsupportedOperationException`.
+
+ Args:
+ identifier (NameIdentifier): A table identifier.
+
+ Raises:
+ UnsupportedOperationException: If the catalog does not support to
purge a table.
+
+ Returns:
+ bool: `True` if the table is purged, `False` if the table does not
exist.
+ """
+ raise UnsupportedOperationException("purgeTable not supported.")
diff --git a/clients/client-python/gravitino/client/dto_converters.py
b/clients/client-python/gravitino/client/dto_converters.py
index 9b0c3775c6..bc24cde268 100644
--- a/clients/client-python/gravitino/client/dto_converters.py
+++ b/clients/client-python/gravitino/client/dto_converters.py
@@ -19,18 +19,20 @@ from gravitino.api.catalog import Catalog
from gravitino.api.catalog_change import CatalogChange
from gravitino.api.job.job_template import JobTemplate, JobType
from gravitino.api.job.job_template_change import (
- TemplateUpdate,
- ShellTemplateUpdate,
- SparkTemplateUpdate,
JobTemplateChange,
RenameJobTemplate,
- UpdateJobTemplateComment,
+ ShellTemplateUpdate,
+ SparkTemplateUpdate,
+ TemplateUpdate,
UpdateJobTemplate,
+ UpdateJobTemplateComment,
)
from gravitino.api.job.shell_job_template import ShellJobTemplate
from gravitino.api.job.spark_job_template import SparkJobTemplate
+from gravitino.api.metalake_change import MetalakeChange
from gravitino.client.fileset_catalog import FilesetCatalog
from gravitino.client.generic_model_catalog import GenericModelCatalog
+from gravitino.client.relational_catalog import RelationalCatalog
from gravitino.dto.catalog_dto import CatalogDTO
from gravitino.dto.job.job_template_dto import JobTemplateDTO
from gravitino.dto.job.shell_job_template_dto import ShellJobTemplateDTO
@@ -46,9 +48,8 @@ from gravitino.dto.requests.job_template_update_request
import (
UpdateJobTemplateContentRequest,
)
from gravitino.dto.requests.metalake_update_request import
MetalakeUpdateRequest
-from gravitino.api.metalake_change import MetalakeChange
-from gravitino.utils import HTTPClient
from gravitino.namespace import Namespace
+from gravitino.utils import HTTPClient
class DTOConverters:
@@ -100,6 +101,17 @@ class DTOConverters:
audit=catalog.audit_info(),
rest_client=client,
)
+ if catalog.type() == Catalog.Type.RELATIONAL:
+ return RelationalCatalog(
+ catalog_namespace=namespace,
+ name=catalog.name(),
+ catalog_type=catalog.type(),
+ provider=catalog.provider(),
+ comment=catalog.comment(),
+ properties=catalog.properties(),
+ audit=catalog.audit_info(),
+ rest_client=client,
+ )
raise NotImplementedError("Unsupported catalog type: " +
str(catalog.type()))
diff --git a/clients/client-python/gravitino/client/relational_catalog.py
b/clients/client-python/gravitino/client/relational_catalog.py
new file mode 100644
index 0000000000..54d0265591
--- /dev/null
+++ b/clients/client-python/gravitino/client/relational_catalog.py
@@ -0,0 +1,209 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from gravitino.api.catalog import Catalog
+from gravitino.api.rel.column import Column
+from gravitino.api.rel.expressions.distributions.distribution import
Distribution
+from gravitino.api.rel.expressions.sorts.sort_order import SortOrder
+from gravitino.api.rel.expressions.transforms.transform import Transform
+from gravitino.api.rel.indexes.index import Index
+from gravitino.api.rel.table import Table
+from gravitino.api.rel.table_catalog import TableCatalog
+from gravitino.client.base_schema_catalog import BaseSchemaCatalog
+from gravitino.client.relational_table import RelationalTable
+from gravitino.dto.audit_dto import AuditDTO
+from gravitino.dto.rel.distribution_dto import DistributionDTO
+from gravitino.dto.requests.table_create_request import TableCreateRequest
+from gravitino.dto.responses.entity_list_response import EntityListResponse
+from gravitino.dto.responses.table_response import TableResponse
+from gravitino.dto.util.dto_converters import DTOConverters
+from gravitino.exceptions.handlers.table_error_handler import
TABLE_ERROR_HANDLER
+from gravitino.name_identifier import NameIdentifier
+from gravitino.namespace import Namespace
+from gravitino.rest.rest_utils import encode_string
+from gravitino.utils import HTTPClient
+
+
+class RelationalCatalog(BaseSchemaCatalog, TableCatalog):
+ """Relational catalog is a catalog implementation
+
+ The `RelationalCatalog` supports relational database like metadata
operations,
+ for example, schemas and tables list, creation, update and deletion. A
Relational
+ catalog is under the metalake.
+ """
+
+ def __init__(
+ self,
+ catalog_namespace: Namespace,
+ name: str,
+ catalog_type: Catalog.Type,
+ provider: str,
+ audit: AuditDTO,
+ rest_client: HTTPClient,
+ comment: Optional[str] = None,
+ properties: Optional[dict[str, str]] = None,
+ ):
+ super().__init__(
+ catalog_namespace,
+ name,
+ catalog_type,
+ provider,
+ comment,
+ properties,
+ audit,
+ rest_client,
+ )
+
+ def as_table_catalog(self) -> TableCatalog:
+ """Return this relational catalog as a :class:`TableCatalog`.
+
+ This method returns ``self`` to provide access to table-related
+ operations defined by the :class:`TableCatalog` interface.
+
+ Returns:
+ TableCatalog: The current catalog instance as a ``TableCatalog``.
+ """
+ return self
+
+ def _check_table_name_identifier(self, identifier: NameIdentifier) -> None:
+ """Check whether the `NameIdentifier` of a table is valid.
+
+ Args:
+ identifier (NameIdentifier):
+ The NameIdentifier to check, which should be "schema.table"
format.
+
+ Raises:
+ IllegalNameIdentifierException: If the Namespace is not valid.
+ """
+ NameIdentifier.check(identifier is not None, "NameIdentifier must not
be null")
+ NameIdentifier.check(
+ identifier.name() is not None and identifier.name() != "",
+ "NameIdentifier name must not be empty",
+ )
+ self._check_table_namespace(identifier.namespace())
+
+ def _check_table_namespace(self, namespace: Namespace) -> None:
+ """Check whether the namespace of a table is valid, which should be
"schema".
+
+ Args:
+ namespace (Namespace): The namespace to check.
+
+ Raises:
+ IllegalNamespaceException: If the Namespace is not valid.
+ """
+ Namespace.check(
+ namespace is not None and namespace.length() == 1,
+ f"Table namespace must be non-null and have 1 level, the input
namespace is {namespace}",
+ )
+
+ def _get_table_full_namespace(self, table_namespace: Namespace) ->
Namespace:
+ """Get the full namespace of the table with the given table's short
namespace (schema name).
+
+ Args:
+ table_namespace (Namespace): The table's short namespace, which is
the schema name.
+
+ Returns:
+ Namespace: full namespace of the table, which is
"metalake.catalog.schema" format.
+ """
+ return Namespace.of(
+ self._catalog_namespace.level(0),
+ self._name,
+ table_namespace.level(0),
+ )
+
+ def _format_table_request_path(self, ns: Namespace) -> str:
+ schema_ns = Namespace.of(ns.level(0), ns.level(1))
+ return (
+ f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}"
+ f"/{encode_string(ns.level(2))}"
+ "/tables"
+ )
+
+ def create_table(
+ self,
+ identifier: NameIdentifier,
+ columns: list[Column],
+ comment: Optional[str] = None,
+ properties: Optional[dict[str, str]] = None,
+ partitioning: Optional[list[Transform]] = None,
+ distribution: Optional[Distribution] = None,
+ sort_orders: Optional[list[SortOrder]] = None,
+ indexes: Optional[list[Index]] = None,
+ ) -> Table:
+ self._check_table_name_identifier(identifier)
+ req = TableCreateRequest(
+ _name=identifier.name(),
+ _columns=DTOConverters.to_dtos(columns),
+ _comment=comment,
+ _properties=properties,
+ _sort_orders=DTOConverters.to_dtos(sort_orders),
+ _distribution=(
+ DTOConverters.to_dto(distribution)
+ if distribution is not None
+ else DistributionDTO.NONE
+ ),
+ _partitioning=DTOConverters.to_dtos(partitioning),
+ _indexes=DTOConverters.to_dtos(indexes),
+ )
+ req.validate()
+ full_namespace = self._get_table_full_namespace(identifier.namespace())
+ resp = self.rest_client.post(
+ self._format_table_request_path(full_namespace),
+ json=req,
+ error_handler=TABLE_ERROR_HANDLER,
+ )
+ table_resp = TableResponse.from_json(resp.body, infer_missing=True)
+ table_resp.validate()
+ return RelationalTable(full_namespace, table_resp.table(),
self.rest_client)
+
+ def list_tables(self, namespace: Namespace) -> list[NameIdentifier]:
+ self._check_table_namespace(namespace)
+ full_namespace = self._get_table_full_namespace(namespace)
+ resp = self.rest_client.get(
+ self._format_table_request_path(full_namespace),
+ error_handler=TABLE_ERROR_HANDLER,
+ )
+ entity_list_resp = EntityListResponse.from_json(resp.body,
infer_missing=True)
+ entity_list_resp.validate()
+ return [
+ NameIdentifier.of(ident.namespace().level(2), ident.name())
+ for ident in entity_list_resp.identifiers()
+ ]
+
+ def load_table(self, identifier: NameIdentifier) -> Table:
+ self._check_table_name_identifier(identifier)
+ full_namespace = self._get_table_full_namespace(identifier.namespace())
+ resp = self.rest_client.get(
+ f"{self._format_table_request_path(full_namespace)}"
+ f"/{encode_string(identifier.name())}",
+ error_handler=TABLE_ERROR_HANDLER,
+ )
+ table_resp = TableResponse.from_json(resp.body, infer_missing=True)
+ table_resp.validate()
+ return RelationalTable(full_namespace, table_resp.table(),
self.rest_client)
+
+ # TODO: We shall implement the following methods after integration tests
for relational table
+ def drop_table(self, identifier: NameIdentifier) -> bool:
+ raise NotImplementedError("Drop table is not implemented yet.")
+
+ def alter_table(self, identifier: NameIdentifier, *changes) -> Table:
+ raise NotImplementedError("Alter table is not implemented yet.")
+
+ def purge_table(self, identifier: NameIdentifier) -> bool:
+ raise NotImplementedError("Purge table is not implemented yet.")
diff --git
a/clients/client-python/gravitino/dto/requests/table_create_request.py
b/clients/client-python/gravitino/dto/requests/table_create_request.py
new file mode 100644
index 0000000000..e12aa76be6
--- /dev/null
+++ b/clients/client-python/gravitino/dto/requests/table_create_request.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import Optional
+
+from dataclasses_json import config
+
+from gravitino.dto.rel.column_dto import ColumnDTO
+from gravitino.dto.rel.distribution_dto import DistributionDTO
+from gravitino.dto.rel.indexes.index_dto import IndexDTO
+from gravitino.dto.rel.indexes.json_serdes.index_serdes import IndexSerdes
+from gravitino.dto.rel.json_serdes.distribution_serdes import
DistributionSerDes
+from gravitino.dto.rel.json_serdes.sort_order_serdes import SortOrderSerdes
+from gravitino.dto.rel.partitioning.json_serdes.partitioning_serdes import (
+ PartitioningSerdes,
+)
+from gravitino.dto.rel.partitioning.partitioning import Partitioning
+from gravitino.dto.rel.sort_order_dto import SortOrderDTO
+from gravitino.rest.rest_message import RESTRequest
+from gravitino.utils.precondition import Precondition
+
+
+@dataclass
+class TableCreateRequest(RESTRequest): # pylint:
disable=too-many-instance-attributes
+ """Represents a request to create a table."""
+
+ _name: str = field(metadata=config(field_name="name"))
+ _columns: list[ColumnDTO] = field(metadata=config(field_name="columns"))
+ _comment: Optional[str] = field(default=None,
metadata=config(field_name="comment"))
+ _partitioning: Optional[list[Partitioning]] = field(
+ default=None,
+ metadata=config(
+ field_name="partitioning",
+ encoder=lambda items: [
+ PartitioningSerdes.serialize(item) for item in items
+ ],
+ decoder=lambda values: [
+ PartitioningSerdes.deserialize(value) for value in values
+ ],
+ exclude=lambda value: value is None,
+ ),
+ )
+ _distribution: Optional[DistributionDTO] = field(
+ default=None,
+ metadata=config(
+ field_name="distribution",
+ encoder=DistributionSerDes.serialize,
+ decoder=DistributionSerDes.deserialize,
+ exclude=lambda value: value is None,
+ ),
+ )
+ _sort_orders: Optional[list[SortOrderDTO]] = field(
+ default=None,
+ metadata=config(
+ field_name="sortOrders",
+ encoder=lambda items: [SortOrderSerdes.serialize(item) for item in
items],
+ decoder=lambda values: [
+ SortOrderSerdes.deserialize(value) for value in values
+ ],
+ exclude=lambda value: value is None,
+ ),
+ )
+ _indexes: Optional[list[IndexDTO]] = field(
+ default=None,
+ metadata=config(
+ field_name="indexes",
+ encoder=lambda items: [IndexSerdes.serialize(item) for item in
items],
+ decoder=lambda values: [IndexSerdes.deserialize(value) for value
in values],
+ exclude=lambda value: value is None,
+ ),
+ )
+ _properties: Optional[dict[str, str]] = field(
+ default=None,
+ metadata=config(field_name="properties", exclude=lambda value: value
is None),
+ )
+
+ def validate(self):
+ Precondition.check_string_not_empty(
+ self._name, '"name" field is required and cannot be empty'
+ )
+ if self._sort_orders:
+ for sort_order in self._sort_orders:
+ sort_order.validate(self._columns)
+
+ if self._distribution:
+ for expression in self._distribution.expressions():
+ expression.validate(self._columns)
+
+ if self._partitioning:
+ for partitioning in self._partitioning:
+ partitioning.validate(self._columns)
+
+ for column in self._columns:
+ column.validate()
+ auto_increment_cols = [
+ column for column in self._columns if column.auto_increment()
+ ]
+ auto_increment_cols_str = (
+ f"[{','.join(column.name() for column in auto_increment_cols)}]"
+ )
+ Precondition.check_argument(
+ len(auto_increment_cols) <= 1,
+ "Only one column can be auto-incremented. There are multiple
auto-increment "
+ f"columns in your table: {auto_increment_cols_str}",
+ )
+
+ if self._indexes:
+ for index in self._indexes:
+ Precondition.check_argument(
+ index.type() is not None, "Index type cannot be null"
+ )
+ Precondition.check_argument(
+ index.field_names() is not None, "Index fieldNames cannot
be null"
+ )
+ Precondition.check_argument(
+ len(index.field_names()) > 0,
+ "Index fieldNames length must be greater than 0",
+ )
diff --git a/clients/client-python/gravitino/dto/responses/table_response.py
b/clients/client-python/gravitino/dto/responses/table_response.py
new file mode 100644
index 0000000000..b12ca5b18e
--- /dev/null
+++ b/clients/client-python/gravitino/dto/responses/table_response.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+
+from dataclasses_json import config
+
+from gravitino.dto.rel.table_dto import TableDTO
+from gravitino.dto.responses.base_response import BaseResponse
+from gravitino.utils.precondition import Precondition
+
+
+@dataclass
+class TableResponse(BaseResponse):
+ """Represents a response for a table."""
+
+ _table: TableDTO = field(metadata=config(field_name="table"))
+
+ def table(self) -> TableDTO:
+ return self._table
+
+ def validate(self) -> None:
+ Precondition.check_string_not_empty(
+ self._table.name(), "table 'name' must not be null and empty"
+ )
+ Precondition.check_argument(
+ self._table.audit_info() is not None, "table 'audit' must not be
null"
+ )
+ Precondition.check_argument(
+ self._table.partitioning() is not None,
+ "table 'partitions' must not be null",
+ )
diff --git a/clients/client-python/gravitino/dto/util/dto_converters.py
b/clients/client-python/gravitino/dto/util/dto_converters.py
index d3188d045f..f59fdabe0a 100644
--- a/clients/client-python/gravitino/dto/util/dto_converters.py
+++ b/clients/client-python/gravitino/dto/util/dto_converters.py
@@ -29,7 +29,10 @@ from gravitino.api.rel.expressions.literals.literals import
Literals
from gravitino.api.rel.expressions.named_reference import FieldReference,
NamedReference
from gravitino.api.rel.expressions.sorts.sort_order import SortOrder
from gravitino.api.rel.expressions.sorts.sort_orders import SortOrders
-from gravitino.api.rel.expressions.transforms.transform import Transform
+from gravitino.api.rel.expressions.transforms.transform import (
+ SingleFieldTransform,
+ Transform,
+)
from gravitino.api.rel.expressions.transforms.transforms import Transforms
from gravitino.api.rel.expressions.unparsed_expression import
UnparsedExpression
from gravitino.api.rel.indexes.index import Index
@@ -49,10 +52,16 @@ from gravitino.dto.rel.expressions.literal_dto import
LiteralDTO
from gravitino.dto.rel.expressions.unparsed_expression_dto import
UnparsedExpressionDTO
from gravitino.dto.rel.indexes.index_dto import IndexDTO
from gravitino.dto.rel.partitioning.bucket_partitioning_dto import
BucketPartitioningDTO
+from gravitino.dto.rel.partitioning.day_partitioning_dto import
DayPartitioningDTO
from gravitino.dto.rel.partitioning.function_partitioning_dto import (
FunctionPartitioningDTO,
)
+from gravitino.dto.rel.partitioning.hour_partitioning_dto import
HourPartitioningDTO
+from gravitino.dto.rel.partitioning.identity_partitioning_dto import (
+ IdentityPartitioningDTO,
+)
from gravitino.dto.rel.partitioning.list_partitioning_dto import
ListPartitioningDTO
+from gravitino.dto.rel.partitioning.month_partitioning_dto import
MonthPartitioningDTO
from gravitino.dto.rel.partitioning.partitioning import (
Partitioning,
SingleFieldPartitioning,
@@ -61,6 +70,7 @@ from gravitino.dto.rel.partitioning.range_partitioning_dto
import RangePartition
from gravitino.dto.rel.partitioning.truncate_partitioning_dto import (
TruncatePartitioningDTO,
)
+from gravitino.dto.rel.partitioning.year_partitioning_dto import
YearPartitioningDTO
from gravitino.dto.rel.partitions.identity_partition_dto import
IdentityPartitionDTO
from gravitino.dto.rel.partitions.list_partition_dto import ListPartitionDTO
from gravitino.dto.rel.partitions.partition_dto import PartitionDTO
@@ -73,6 +83,14 @@ from gravitino.exceptions.base import
IllegalArgumentException
class DTOConverters:
"""Utility class for converting between DTOs and domain objects."""
+ _SINGLE_FIELD_TRANSFORM_TYPES = {
+ Transforms.NAME_OF_IDENTITY: IdentityPartitioningDTO,
+ Transforms.NAME_OF_YEAR: YearPartitioningDTO,
+ Transforms.NAME_OF_MONTH: MonthPartitioningDTO,
+ Transforms.NAME_OF_DAY: DayPartitioningDTO,
+ Transforms.NAME_OF_HOUR: HourPartitioningDTO,
+ }
+
@staticmethod
def from_function_arg(arg: FunctionArg) -> Expression:
"""Converts a FunctionArg DTO to an Expression.
@@ -331,19 +349,19 @@ class DTOConverters:
@overload
@staticmethod
- def from_dtos(dtos: list[ColumnDTO]) -> list[Column]: ...
+ def from_dtos(dtos: list[ColumnDTO]) -> list[Column]: ... # pragma: no
cover
@overload
@staticmethod
- def from_dtos(dtos: list[IndexDTO]) -> list[Index]: ...
+ def from_dtos(dtos: list[IndexDTO]) -> list[Index]: ... # pragma: no cover
@overload
@staticmethod
- def from_dtos(dtos: list[SortOrderDTO]) -> list[SortOrder]: ...
+ def from_dtos(dtos: list[SortOrderDTO]) -> list[SortOrder]: ... # pragma:
no cover
@overload
@staticmethod
- def from_dtos(dtos: list[Partitioning]) -> list[Transform]: ...
+ def from_dtos(dtos: list[Partitioning]) -> list[Transform]: ... # pragma:
no cover
@staticmethod
def from_dtos(dtos):
@@ -362,6 +380,22 @@ class DTOConverters:
return []
return [DTOConverters.from_dto(dto) for dto in dtos]
+ @staticmethod
+ def to_function_args(
+ expressions: list[Expression],
+ ) -> list[FunctionArg]:
+ """Converts a list of Expressions to a list of FunctionArg DTOs.
+
+ Args:
+ expressions (list[Expression]): The expressions to be converted.
+
+ Returns:
+ list[FunctionArg]: The list of expression DTOs.
+ """
+ if not expressions:
+ return []
+ return [DTOConverters.to_function_arg(expr) for expr in expressions]
+
@staticmethod
def to_function_arg(expression: Expression) -> FunctionArg:
"""Converts an Expression to an FunctionArg DTO.
@@ -459,3 +493,153 @@ class DTOConverters:
raise IllegalArgumentException(
f"Unsupported partition type: {obj.__class__.__name__}"
)
+
+ @to_dto.register
+ @staticmethod
+ def _(obj: Column) -> ColumnDTO:
+ return (
+ ColumnDTO.builder()
+ .with_name(obj.name())
+ .with_data_type(obj.data_type())
+ .with_comment(obj.comment())
+ .with_nullable(obj.nullable())
+ .with_auto_increment(obj.auto_increment())
+ .with_default_value(
+ Column.DEFAULT_VALUE_NOT_SET
+ if obj.default_value() is None
+ or obj.default_value() is Column.DEFAULT_VALUE_NOT_SET
+ else DTOConverters.to_function_arg(obj.default_value())
+ )
+ .build()
+ )
+
+ @to_dto.register
+ @staticmethod
+ def _(obj: SortOrderDTO) -> SortOrderDTO:
+ return obj
+
+ @to_dto.register
+ @staticmethod
+ def _(obj: SortOrder) -> SortOrderDTO:
+ return SortOrderDTO(
+ sort_term=DTOConverters.to_function_arg(obj.expression()),
+ direction=obj.direction(),
+ null_ordering=obj.null_ordering(),
+ )
+
+ @to_dto.register
+ @staticmethod
+ def _(obj: IndexDTO) -> IndexDTO:
+ return obj
+
+ @to_dto.register
+ @staticmethod
+ def _(obj: Index) -> IndexDTO:
+ return IndexDTO(
+ index_type=obj.type(),
+ name=obj.name(),
+ field_names=obj.field_names(),
+ )
+
+ @to_dto.register
+ @staticmethod
+ def _(obj: Partitioning) -> Partitioning:
+ return obj
+
+ @to_dto.register
+ @staticmethod
+ def _(obj: Transform) -> Partitioning:
+ if isinstance(obj, SingleFieldTransform):
+ transform_class = DTOConverters._SINGLE_FIELD_TRANSFORM_TYPES.get(
+ obj.name()
+ )
+ if transform_class is None:
+ raise IllegalArgumentException(f"Unsupported transform:
{obj.name()}")
+ return transform_class(*obj.field_name())
+ if isinstance(obj, Transforms.BucketTransform):
+ bucket_transform = cast(Transforms.BucketTransform, obj)
+ return BucketPartitioningDTO(
+ bucket_transform.num_buckets(),
+ *bucket_transform.field_names(),
+ )
+ if isinstance(obj, Transforms.TruncateTransform):
+ truncate_transform = cast(Transforms.TruncateTransform, obj)
+ return TruncatePartitioningDTO(
+ truncate_transform.width(),
+ truncate_transform.field_name(),
+ )
+ if isinstance(obj, Transforms.ListTransform):
+ list_transform = cast(Transforms.ListTransform, obj)
+ list_assignments: list[ListPartitionDTO] = [
+ cast(ListPartitionDTO, DTOConverters.to_dto(assignment))
+ for assignment in list_transform.assignments()
+ ]
+ return ListPartitioningDTO(
+ field_names=list_transform.field_names(),
+ assignments=list_assignments,
+ )
+ if isinstance(obj, Transforms.RangeTransform):
+ range_transform = cast(Transforms.RangeTransform, obj)
+ range_assignments: list[RangePartitionDTO] = [
+ cast(RangePartitionDTO, DTOConverters.to_dto(assignment))
+ for assignment in range_transform.assignments()
+ ]
+ return RangePartitioningDTO(
+ field_name=range_transform.field_name(),
+ assignments=range_assignments,
+ )
+ if isinstance(obj, Transforms.ApplyTransform):
+ return FunctionPartitioningDTO(
+ obj.name(), *DTOConverters.to_function_args(obj.arguments())
+ )
+ raise IllegalArgumentException(f"Unsupported transform: {obj.name()}")
+
+ @to_dto.register
+ @staticmethod
+ def _(obj: Distribution) -> DistributionDTO:
+ if obj is None or obj is Distributions.NONE:
+ return DistributionDTO.NONE
+ if isinstance(obj, DistributionDTO):
+ return obj
+ return DistributionDTO(
+ strategy=obj.strategy(),
+ number=obj.number(),
+ args=[
+ DTOConverters.to_function_arg(expression)
+ for expression in obj.expressions()
+ ],
+ )
+
+ @overload
+ @staticmethod
+ def to_dtos(dtos: list[Column]) -> list[ColumnDTO]: ... # pragma: no cover
+
+ @overload
+ @staticmethod
+ def to_dtos(dtos: list[Index]) -> list[IndexDTO]: ... # pragma: no cover
+
+ @overload
+ @staticmethod
+ def to_dtos(dtos: list[IndexDTO]) -> list[IndexDTO]: ... # pragma: no
cover
+
+ @overload
+ @staticmethod
+ def to_dtos(dtos: list[SortOrder]) -> list[SortOrderDTO]: ... # pragma:
no cover
+
+ @overload
+ @staticmethod
+ def to_dtos(dtos: list[SortOrderDTO]) -> list[SortOrderDTO]: ... #
pragma: no cover
+
+ @overload
+ @staticmethod
+ def to_dtos(dtos: list[Transform]) -> list[Partitioning]: ... # pragma:
no cover
+
+ @overload
+ @staticmethod
+ def to_dtos(dtos: list[Partitioning]) -> list[Partitioning]: ... #
pragma: no cover
+
+ @staticmethod
+ def to_dtos(dtos):
+ if not dtos:
+ return []
+ return [DTOConverters.to_dto(dto) for dto in dtos]
diff --git a/clients/client-python/gravitino/exceptions/base.py
b/clients/client-python/gravitino/exceptions/base.py
index 62422082e2..de7ef01bce 100644
--- a/clients/client-python/gravitino/exceptions/base.py
+++ b/clients/client-python/gravitino/exceptions/base.py
@@ -203,3 +203,7 @@ class NoSuchPartitionException(NotFoundException):
class PartitionAlreadyExistsException(AlreadyExistsException):
"""An exception thrown when a partition with specified name already
exists."""
+
+
+class TableAlreadyExistsException(AlreadyExistsException):
+ """An exception thrown when a table already exists."""
diff --git
a/clients/client-python/gravitino/exceptions/handlers/table_error_handler.py
b/clients/client-python/gravitino/exceptions/handlers/table_error_handler.py
new file mode 100644
index 0000000000..85c4846ecf
--- /dev/null
+++ b/clients/client-python/gravitino/exceptions/handlers/table_error_handler.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from gravitino.constants.error import ErrorConstants
+from gravitino.dto.responses.error_response import ErrorResponse
+from gravitino.exceptions.base import (
+ CatalogNotInUseException,
+ ForbiddenException,
+ IllegalArgumentException,
+ MetalakeNotInUseException,
+ NoSuchSchemaException,
+ NoSuchTableException,
+ NotFoundException,
+ NotInUseException,
+ TableAlreadyExistsException,
+ UnsupportedOperationException,
+)
+from gravitino.exceptions.handlers.rest_error_handler import RestErrorHandler
+
+
+class TableErrorHandler(RestErrorHandler):
+ def handle(self, error_response: ErrorResponse):
+ error_message = error_response.format_error_message()
+ code = ErrorConstants(error_response.code())
+ exception_type = error_response.type()
+
+ if code is ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
+ raise IllegalArgumentException(error_message)
+
+ if code is ErrorConstants.NOT_FOUND_CODE:
+ if exception_type == NoSuchSchemaException.__name__:
+ raise NoSuchSchemaException(error_message)
+ if exception_type == NoSuchTableException.__name__:
+ raise NoSuchTableException(error_message)
+ raise NotFoundException(error_message)
+
+ if code is ErrorConstants.ALREADY_EXISTS_CODE:
+ raise TableAlreadyExistsException(error_message)
+
+ if code is ErrorConstants.INTERNAL_ERROR_CODE:
+ raise RuntimeError(error_message)
+
+ if code is ErrorConstants.UNSUPPORTED_OPERATION_CODE:
+ raise UnsupportedOperationException(error_message)
+
+ if code is ErrorConstants.FORBIDDEN_CODE:
+ raise ForbiddenException(error_message)
+
+ if code is ErrorConstants.NOT_IN_USE_CODE:
+ if exception_type == CatalogNotInUseException.__name__:
+ raise CatalogNotInUseException(error_message)
+ if exception_type == MetalakeNotInUseException.__name__:
+ raise MetalakeNotInUseException(error_message)
+ raise NotInUseException(error_message)
+ super().handle(error_response)
+
+
+TABLE_ERROR_HANDLER = TableErrorHandler()
diff --git
a/clients/client-python/tests/unittests/dto/util/test_dto_converters.py
b/clients/client-python/tests/unittests/dto/util/test_dto_converters.py
index 171f7ee864..dc29c89203 100644
--- a/clients/client-python/tests/unittests/dto/util/test_dto_converters.py
+++ b/clients/client-python/tests/unittests/dto/util/test_dto_converters.py
@@ -31,7 +31,9 @@ from gravitino.api.rel.expressions.literals.literals import
Literals
from gravitino.api.rel.expressions.named_reference import FieldReference
from gravitino.api.rel.expressions.sorts.null_ordering import NullOrdering
from gravitino.api.rel.expressions.sorts.sort_direction import SortDirection
+from gravitino.api.rel.expressions.sorts.sort_order import SortOrder
from gravitino.api.rel.expressions.sorts.sort_orders import SortOrders
+from gravitino.api.rel.expressions.transforms.transform import Transform
from gravitino.api.rel.expressions.transforms.transforms import Transforms
from gravitino.api.rel.expressions.unparsed_expression import
UnparsedExpression
from gravitino.api.rel.indexes.index import Index
@@ -108,6 +110,14 @@ class TestDTOConverters(unittest.TestCase):
.with_unparsed_expression("unparsed")
.build(),
}
+ cls.single_field_transforms = {
+ Transforms.NAME_OF_IDENTITY: Transforms.identity("score"),
+ Transforms.NAME_OF_YEAR: Transforms.year("createTime"),
+ Transforms.NAME_OF_MONTH: Transforms.month("createTime"),
+ Transforms.NAME_OF_DAY: Transforms.day("createTime"),
+ Transforms.NAME_OF_HOUR: Transforms.hour("createTime"),
+ }
+
cls.table_dto_json = """
{
"name": "example_table",
@@ -768,3 +778,209 @@ class TestDTOConverters(unittest.TestCase):
)
converted = DTOConverters.to_dto(partition)
self.assertTrue(converted == expected)
+
+ def test_to_dtos_columns(self):
+ column_names = {f"column_{i}" for i in range(2)}
+ column_data_types = {Types.IntegerType.get(), Types.BooleanType.get()}
+ columns: list[Column] = [
+ Column.of(name=column_name, data_type=column_data_type)
+ for column_name, column_data_type in zip(column_names,
column_data_types)
+ ]
+ expected = [
+ ColumnDTO.builder()
+ .with_name(column.name())
+ .with_data_type(column.data_type())
+ .with_default_value(Column.DEFAULT_VALUE_NOT_SET)
+ .build()
+ for column in columns
+ ]
+ self.assertListEqual(DTOConverters.to_dtos(columns), expected)
+
+ def test_to_dtos_sort_orders(self):
+ directions = {SortDirection.ASCENDING, SortDirection.DESCENDING}
+ null_orderings = {NullOrdering.NULLS_LAST, NullOrdering.NULLS_FIRST}
+ field_names = [
+ [f"score_{i}"] for i in range(len(directions) *
len(null_orderings))
+ ]
+ sort_orders: list[SortOrder] = []
+ expected_dtos: list[SortOrderDTO] = []
+ for field_name, (direction, null_ordering) in zip(
+ field_names, product(directions, null_orderings)
+ ):
+ field_ref = FieldReference(field_names=field_name)
+ sort_orders.append(
+ SortOrders.of(
+ expression=field_ref,
+ direction=direction,
+ null_ordering=null_ordering,
+ )
+ )
+ field_ref_dto = (
+ FieldReferenceDTO.builder()
+ .with_field_name(field_name=field_name)
+ .build()
+ )
+ expected_dtos.append(
+ SortOrderDTO(
+ sort_term=field_ref_dto,
+ direction=direction,
+ null_ordering=null_ordering,
+ )
+ )
+ converted_dtos = DTOConverters.to_dtos(sort_orders)
+ for converted, expected in zip(converted_dtos, expected_dtos):
+ self.assertEqual(converted.sort_term(), expected.sort_term())
+ self.assertEqual(converted.direction(), expected.direction())
+ self.assertEqual(converted.null_ordering(),
expected.null_ordering())
+
+ self.assertListEqual(DTOConverters.to_dtos(converted_dtos),
converted_dtos)
+
+ def test_to_dtos_indexes(self):
+ field_names = [[f"field_{i}"] for i in range(2)]
+
+ indexes: list[Index] = [
+ Indexes.of(index_type, index_type.value, field_names)
+ for index_type in Index.IndexType
+ ]
+ expected_dtos: list[IndexDTO] = [
+ IndexDTO(
+ index_type=index_type,
+ name=index_type.value,
+ field_names=field_names,
+ )
+ for index_type in Index.IndexType
+ ]
+ converted_dtos = DTOConverters.to_dtos(indexes)
+ for converted, expected in zip(converted_dtos, expected_dtos):
+ self.assertEqual(converted.type(), expected.type())
+ self.assertEqual(converted.name(), expected.name())
+ self.assertListEqual(converted.field_names(),
expected.field_names())
+
+ self.assertListEqual(DTOConverters.to_dtos(converted_dtos),
converted_dtos)
+
+ def test_to_dtos_single_field_transforms(self):
+ converted_dtos =
DTOConverters.to_dtos(self.single_field_transforms.values())
+ for key, converted in zip(self.single_field_transforms.keys(),
converted_dtos):
+ transform_class = DTOConverters._SINGLE_FIELD_TRANSFORM_TYPES[ #
pylint: disable=protected-access
+ key
+ ]
+ expected = transform_class(*converted.field_name())
+ self.assertEqual(converted.field_name(), expected.field_name())
+
+ def test_to_dtos_bucket_truncate_transforms(self):
+ num_buckets, width = 10, 5
+ field_name = ["score"]
+ bucket_transform = Transforms.bucket(num_buckets, field_name)
+ trunc_transform = Transforms.truncate(width, field_name)
+ transforms = [bucket_transform, trunc_transform]
+ converted_dtos = DTOConverters.to_dtos(transforms)
+ expected_bucket_dto = BucketPartitioningDTO(num_buckets, field_name)
+ expected_trunc_dto = TruncatePartitioningDTO(width=width,
field_name=field_name)
+ expected_dtos = [expected_bucket_dto, expected_trunc_dto]
+ for converted, expected in zip(converted_dtos, expected_dtos):
+ if isinstance(expected, BucketPartitioningDTO):
+ self.assertEqual(converted.num_buckets(),
expected.num_buckets())
+ self.assertListEqual(converted.field_names(),
expected.field_names())
+ else:
+ self.assertEqual(converted.width(), expected.width())
+ self.assertListEqual(converted.field_name(),
expected.field_name())
+
+ def test_to_dtos_list_range_transforms(self):
+ field_names = [["createTime"], ["city"]]
+ list_transform = Transforms.list(
+ field_names=field_names,
+ assignments=[
+ Partitions.list(
+ name="p0",
+ lists=[
+ [Literals.date_literal(date(2025, 8, 8))],
+ [Literals.string_literal("Los Angeles")],
+ ],
+ properties={},
+ ),
+ ],
+ )
+ range_transform = Transforms.range(
+ field_name=["score"],
+ assignments=[
+ Partitions.range(
+ name="p1",
+ lower=Literals.integer_literal(0),
+ upper=Literals.integer_literal(100),
+ properties={},
+ )
+ ],
+ )
+ transforms = [list_transform, range_transform]
+ converted_dtos = DTOConverters.to_dtos(transforms)
+ expected_list_dto = ListPartitioningDTO(
+ field_names=field_names,
+ assignments=[
+ DTOConverters.to_dto(assignment)
+ for assignment in list_transform.assignments()
+ ],
+ )
+ expected_range_dto = RangePartitioningDTO(
+ field_name=["score"],
+ assignments=[
+ DTOConverters.to_dto(assignment)
+ for assignment in range_transform.assignments()
+ ],
+ )
+ expected_dtos = [expected_list_dto, expected_range_dto]
+ for converted, expected in zip(converted_dtos, expected_dtos):
+ if isinstance(expected, ListPartitioningDTO):
+ self.assertListEqual(converted.field_names(),
expected.field_names())
+ else:
+ self.assertListEqual(converted.field_name(),
expected.field_name())
+ self.assertListEqual(converted.assignments(),
expected.assignments())
+
+ def test_to_dtos_apply_transform(self):
+ function_name = "test_function"
+ args: list[FunctionArg] = [
+ LiteralDTO.builder()
+ .with_data_type(Types.IntegerType.get())
+ .with_value("-1")
+ .build(),
+ LiteralDTO.builder()
+ .with_data_type(Types.BooleanType.get())
+ .with_value("True")
+ .build(),
+ ]
+ apply_transform = Transforms.apply(
+ name=function_name,
+ arguments=[
+ Literals.of(value="-1", data_type=Types.IntegerType.get()),
+ Literals.of(value="True", data_type=Types.BooleanType.get()),
+ ],
+ )
+ expected = FunctionPartitioningDTO(function_name, *args)
+ converted = DTOConverters.to_dto(apply_transform)
+ self.assertEqual(converted.function_name(), expected.function_name())
+ self.assertListEqual(converted.args(), expected.args())
+
+ def test_to_dtos_raise_exception(self):
+ with self.assertRaisesRegex(IllegalArgumentException, "Unsupported
transform"):
+ DTOConverters.to_dto(
+ cast(Transform, MagicMock(name="UnsupportedTransform",
spec=Transform))
+ )
+
+ def test_to_dto_distribution(self):
+ field_names = [f"field_{i}" for i in range(2)]
+ field_ref_dtos = [
+
FieldReferenceDTO.builder().with_field_name(field_name=[field_name]).build()
+ for field_name in field_names
+ ]
+ distribution = Distributions.of(
+ Strategy.HASH,
+ 4,
+ *[FieldReference(field_names=[field_name]) for field_name in
field_names],
+ )
+ distribution_dto = DistributionDTO(
+ strategy=Strategy.HASH, number=4, args=field_ref_dtos
+ )
+ converted = DTOConverters.to_dto(distribution)
+ self.assertEqual(converted, distribution_dto)
+
+ self.assertEqual(DTOConverters.to_dto(Distributions.NONE),
DistributionDTO.NONE)
+ self.assertEqual(DTOConverters.to_dto(distribution_dto),
distribution_dto)
diff --git a/clients/client-python/tests/unittests/test_error_handler.py
b/clients/client-python/tests/unittests/test_error_handler.py
index 291daa6d49..a0133a7a97 100644
--- a/clients/client-python/tests/unittests/test_error_handler.py
+++ b/clients/client-python/tests/unittests/test_error_handler.py
@@ -23,6 +23,7 @@ from gravitino.exceptions.base import (
CatalogAlreadyExistsException,
CatalogNotInUseException,
ConnectionFailedException,
+ ForbiddenException,
IllegalArgumentException,
InternalError,
MetalakeAlreadyExistsException,
@@ -41,6 +42,7 @@ from gravitino.exceptions.base import (
PartitionAlreadyExistsException,
RESTException,
SchemaAlreadyExistsException,
+ TableAlreadyExistsException,
UnsupportedOperationException,
)
from gravitino.exceptions.handlers.catalog_error_handler import
CATALOG_ERROR_HANDLER
@@ -54,6 +56,7 @@ from gravitino.exceptions.handlers.partition_error_handler
import (
)
from gravitino.exceptions.handlers.rest_error_handler import REST_ERROR_HANDLER
from gravitino.exceptions.handlers.schema_error_handler import
SCHEMA_ERROR_HANDLER
+from gravitino.exceptions.handlers.table_error_handler import
TABLE_ERROR_HANDLER
class TestErrorHandler(unittest.TestCase):
@@ -340,3 +343,78 @@ class TestErrorHandler(unittest.TestCase):
PARTITION_ERROR_HANDLER.handle(
ErrorResponse.generate_error_response(Exception, "mock error")
)
+
+ def test_table_error_handler(self):
+ with self.assertRaises(IllegalArgumentException):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(
+ IllegalArgumentException, "mock error"
+ )
+ )
+
+ with self.assertRaises(NoSuchSchemaException):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(
+ NoSuchSchemaException, "mock error"
+ )
+ )
+
+ with self.assertRaises(NoSuchTableException):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(
+ NoSuchTableException, "mock error"
+ )
+ )
+
+ with self.assertRaises(NotFoundException):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(NotFoundException, "mock
error")
+ )
+
+ with self.assertRaises(TableAlreadyExistsException):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(
+ TableAlreadyExistsException, "mock error"
+ )
+ )
+
+ with self.assertRaises(RuntimeError):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(RuntimeError, "mock
error")
+ )
+
+ with self.assertRaises(UnsupportedOperationException):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(
+ UnsupportedOperationException, "mock error"
+ )
+ )
+
+ with self.assertRaises(ForbiddenException):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(ForbiddenException,
"mock error")
+ )
+
+ with self.assertRaises(CatalogNotInUseException):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(
+ CatalogNotInUseException, "mock error"
+ )
+ )
+
+ with self.assertRaises(MetalakeNotInUseException):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(
+ MetalakeNotInUseException, "mock error"
+ )
+ )
+
+ with self.assertRaises(NotInUseException):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(NotInUseException, "mock
error")
+ )
+
+ with self.assertRaises(RESTException):
+ TABLE_ERROR_HANDLER.handle(
+ ErrorResponse.generate_error_response(Exception, "mock error")
+ )
diff --git a/clients/client-python/tests/unittests/test_relational_catalog.py
b/clients/client-python/tests/unittests/test_relational_catalog.py
new file mode 100644
index 0000000000..dedf92d0d0
--- /dev/null
+++ b/clients/client-python/tests/unittests/test_relational_catalog.py
@@ -0,0 +1,278 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from http.client import HTTPResponse
+from unittest.mock import Mock, patch
+
+from gravitino.client.relational_catalog import RelationalCatalog
+from gravitino.dto.audit_dto import AuditDTO
+from gravitino.dto.rel.distribution_dto import DistributionDTO
+from gravitino.dto.rel.table_dto import TableDTO
+from gravitino.dto.responses.entity_list_response import EntityListResponse
+from gravitino.dto.responses.table_response import TableResponse
+from gravitino.dto.util.dto_converters import DTOConverters
+from gravitino.exceptions.base import (
+ NoSuchSchemaException,
+ NoSuchTableException,
+ TableAlreadyExistsException,
+)
+from gravitino.name_identifier import NameIdentifier
+from gravitino.namespace import Namespace
+from gravitino.utils import HTTPClient, Response
+
+
+class TestRelationalCatalog(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls) -> None:
+ cls.metalake_name = "test_metalake"
+ cls.catalog_name = "test_catalog"
+ cls.schema_name = "test_schema"
+ cls.table_name = "test_table"
+ cls.catalog_namespace = Namespace.of(cls.metalake_name)
+ cls.table_identifier = NameIdentifier.of(cls.schema_name,
cls.table_name)
+ cls.rest_client = HTTPClient("http://localhost:8090")
+ cls.catalog = RelationalCatalog(
+ catalog_namespace=cls.catalog_namespace,
+ name=cls.catalog_name,
+ catalog_type=RelationalCatalog.Type.RELATIONAL,
+ provider="test_provider",
+ audit=AuditDTO("anonymous"),
+ rest_client=cls.rest_client,
+ )
+ cls.TABLE_DTO_JSON_STRING = """
+ {
+ "name": "example_table",
+ "comment": "This is an example table",
+ "audit": {
+ "creator": "Apache Gravitino",
+ "createTime":"2025-10-10T00:00:00"
+ },
+ "columns": [
+ {
+ "name": "id",
+ "type": "integer",
+ "comment": "id column comment",
+ "nullable": false,
+ "autoIncrement": true,
+ "defaultValue": {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "-1"
+ }
+ },
+ {
+ "name": "name",
+ "type": "varchar(500)",
+ "comment": "name column comment",
+ "nullable": true,
+ "autoIncrement": false,
+ "defaultValue": {
+ "type": "literal",
+ "dataType": "null",
+ "value": "null"
+ }
+ },
+ {
+ "name": "StartingDate",
+ "type": "timestamp",
+ "comment": "StartingDate column comment",
+ "nullable": false,
+ "autoIncrement": false,
+ "defaultValue": {
+ "type": "function",
+ "funcName": "current_timestamp",
+ "funcArgs": []
+ }
+ },
+ {
+ "name": "info",
+ "type": {
+ "type": "struct",
+ "fields": [
+ {
+ "name": "position",
+ "type": "string",
+ "nullable": true,
+ "comment": "position field comment"
+ },
+ {
+ "name": "contact",
+ "type": {
+ "type": "list",
+ "elementType": "integer",
+ "containsNull": false
+ },
+ "nullable": true,
+ "comment": "contact field comment"
+ },
+ {
+ "name": "rating",
+ "type": {
+ "type": "map",
+ "keyType": "string",
+ "valueType": "integer",
+ "valueContainsNull": false
+ },
+ "nullable": true,
+ "comment": "rating field comment"
+ }
+ ]
+ },
+ "comment": "info column comment",
+ "nullable": true
+ },
+ {
+ "name": "dt",
+ "type": "date",
+ "comment": "dt column comment",
+ "nullable": true
+ }
+ ],
+ "partitioning": [
+ {
+ "strategy": "identity",
+ "fieldName": [ "dt" ]
+ }
+ ],
+ "distribution": {
+ "strategy": "hash",
+ "number": 32,
+ "funcArgs": [
+ {
+ "type": "field",
+ "fieldName": [ "id" ]
+ }
+ ]
+ },
+ "sortOrders": [
+ {
+ "sortTerm": {
+ "type": "field",
+ "fieldName": [ "StartingDate" ]
+ },
+ "direction": "asc",
+ "nullOrdering": "nulls_first"
+ }
+ ],
+ "indexes": [
+ {
+ "indexType": "primary_key",
+ "name": "PRIMARY",
+ "fieldNames": [["id"]]
+ }
+ ],
+ "properties": {
+ "format": "ORC"
+ }
+ }
+ """
+ cls.table_dto = TableDTO.from_json(cls.TABLE_DTO_JSON_STRING)
+
+ def _get_mock_http_resp(self, json_str: str, return_code: int = 200):
+ mock_http_resp = Mock(HTTPResponse)
+ mock_http_resp.getcode.return_value = return_code
+ mock_http_resp.read.return_value = json_str
+ mock_http_resp.info.return_value = None
+ mock_http_resp.url = None
+ mock_resp = Response(mock_http_resp)
+ return mock_resp
+
+ def test_as_table_catalog(self):
+ table_catalog = self.catalog.as_table_catalog()
+ self.assertIs(table_catalog, self.catalog)
+
+ def test_create_table(self):
+ resp_body = TableResponse(0, self.table_dto)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.post",
+ return_value=mock_resp,
+ ):
+ table = self.catalog.create_table(
+ identifier=self.table_identifier,
+ columns=DTOConverters.from_dtos(self.table_dto.columns()),
+
partitioning=DTOConverters.from_dtos(self.table_dto.partitioning()),
+ distribution=DTOConverters.from_dto(
+ self.table_dto.distribution() or DistributionDTO.NONE
+ ),
+
sort_orders=DTOConverters.from_dtos(self.table_dto.sort_order()),
+ indexes=DTOConverters.from_dtos(self.table_dto.index()),
+ properties=self.table_dto.properties(),
+ )
+ self.assertEqual(table.name(), self.table_dto.name())
+
+ def test_load_table(self):
+ resp_body = TableResponse(0, self.table_dto)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
+ return_value=mock_resp,
+ ):
+ table = self.catalog.load_table(self.table_identifier)
+ self.assertEqual(table.name(), self.table_dto.name())
+
+ def test_list_tables(self):
+ resp_body = EntityListResponse(
+ 0,
+ [
+ NameIdentifier.of(
+ self.metalake_name,
+ self.catalog_name,
+ self.schema_name,
+ self.table_name,
+ )
+ ],
+ )
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
+ return_value=mock_resp,
+ ):
+ tables =
self.catalog.list_tables(namespace=Namespace.of(self.schema_name))
+ self.assertEqual(len(tables), 1)
+ self.assertEqual(tables[0], self.table_identifier)
+
+ def test_load_table_not_exists(self):
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
+ side_effect=NoSuchTableException("Table not found"),
+ ):
+ with self.assertRaises(NoSuchTableException):
+ self.catalog.load_table(self.table_identifier)
+
+ def test_create_table_already_exists(self):
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.post",
+ side_effect=TableAlreadyExistsException("Table already exists"),
+ ):
+ with self.assertRaises(TableAlreadyExistsException):
+ self.catalog.create_table(
+ identifier=self.table_identifier,
+ columns=DTOConverters.from_dtos(self.table_dto.columns()),
+ )
+
+ def test_list_tables_invalid_namespace(self):
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
+ side_effect=NoSuchSchemaException("Schema not found"),
+ ):
+ with self.assertRaises(NoSuchSchemaException):
+
self.catalog.list_tables(namespace=Namespace.of("invalid_schema"))
diff --git a/clients/client-python/tests/unittests/test_requests.py
b/clients/client-python/tests/unittests/test_requests.py
index 260855965b..66d799a71b 100644
--- a/clients/client-python/tests/unittests/test_requests.py
+++ b/clients/client-python/tests/unittests/test_requests.py
@@ -20,6 +20,7 @@ import unittest
from typing import cast
from gravitino.dto.requests.add_partitions_request import AddPartitionsRequest
+from gravitino.dto.requests.table_create_request import TableCreateRequest
from gravitino.exceptions.base import IllegalArgumentException
@@ -39,3 +40,178 @@ class TestRequests(unittest.TestCase):
with self.assertRaisesRegex(IllegalArgumentException,
exception_str):
req = AddPartitionsRequest.from_json(json_str)
req.validate()
+
+ def test_table_create_request(self):
+ json_str = """
+ {
+ "name": "example_table",
+ "comment": "This is an example table",
+ "columns": [
+ {
+ "name": "id",
+ "type": "integer",
+ "comment": "id column comment",
+ "nullable": false,
+ "autoIncrement": true,
+ "defaultValue": {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "-1"
+ }
+ },
+ {
+ "name": "name",
+ "type": "varchar(500)",
+ "comment": "name column comment",
+ "nullable": true,
+ "autoIncrement": false,
+ "defaultValue": {
+ "type": "literal",
+ "dataType": "null",
+ "value": "null"
+ }
+ },
+ {
+ "name": "StartingDate",
+ "type": "timestamp",
+ "comment": "StartingDate column comment",
+ "nullable": false,
+ "autoIncrement": false,
+ "defaultValue": {
+ "type": "function",
+ "funcName": "current_timestamp",
+ "funcArgs": []
+ }
+ },
+ {
+ "name": "info",
+ "type": {
+ "type": "struct",
+ "fields": [
+ {
+ "name": "position",
+ "type": "string",
+ "nullable": true,
+ "comment": "position field comment"
+ },
+ {
+ "name": "contact",
+ "type": {
+ "type": "list",
+ "elementType": "integer",
+ "containsNull": false
+ },
+ "nullable": true,
+ "comment": "contact field comment"
+ },
+ {
+ "name": "rating",
+ "type": {
+ "type": "map",
+ "keyType": "string",
+ "valueType": "integer",
+ "valueContainsNull": false
+ },
+ "nullable": true,
+ "comment": "rating field comment"
+ }
+ ]
+ },
+ "comment": "info column comment",
+ "nullable": true
+ },
+ {
+ "name": "dt",
+ "type": "date",
+ "comment": "dt column comment",
+ "nullable": true
+ },
+ {
+ "name": "age",
+ "type": "integer",
+ "comment": "age column comment",
+ "nullable": true
+ }
+ ],
+ "partitioning": [
+ {
+ "strategy": "identity",
+ "fieldName": [ "dt" ]
+ }
+ ],
+ "distribution": {
+ "strategy": "hash",
+ "number": 32,
+ "funcArgs": [
+ {
+ "type": "field",
+ "fieldName": [ "id" ]
+ }
+ ]
+ },
+ "sortOrders": [
+ {
+ "sortTerm": {
+ "type": "field",
+ "fieldName": [ "age" ]
+ },
+ "direction": "asc",
+ "nullOrdering": "nulls_first"
+ }
+ ],
+ "indexes": [
+ {
+ "indexType": "primary_key",
+ "name": "PRIMARY",
+ "fieldNames": [["id"]]
+ }
+ ],
+ "properties": {
+ "format": "ORC"
+ }
+ }
+ """
+
+ req = TableCreateRequest.from_json(json_str)
+ req.validate()
+
+ multiple_auto_increment_json_str = """
+ {
+ "name": "example_table",
+ "comment": "This is an example table",
+ "columns": [
+ {
+ "name": "id",
+ "type": "integer",
+ "comment": "id column comment",
+ "nullable": false,
+ "autoIncrement": true,
+ "defaultValue": {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "-1"
+ }
+ },
+ {
+ "name": "age",
+ "type": "integer",
+ "comment": "age column comment",
+ "nullable": false,
+ "autoIncrement": true,
+ "defaultValue": {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "-1"
+ }
+ }
+ ]
+ }
+ """
+ exceptions = {
+ '"name" field is required and cannot be empty':
'{"name":"","columns":[]}',
+ "Only one column can be auto-incremented.":
multiple_auto_increment_json_str,
+ }
+ for exception_str, json_str in exceptions.items():
+ with self.assertRaisesRegex(IllegalArgumentException,
exception_str):
+ req = TableCreateRequest.from_json(json_str)
+ req.validate()
diff --git a/clients/client-python/tests/unittests/test_responses.py
b/clients/client-python/tests/unittests/test_responses.py
index de6505cfcd..51fc4aa340 100644
--- a/clients/client-python/tests/unittests/test_responses.py
+++ b/clients/client-python/tests/unittests/test_responses.py
@@ -31,6 +31,7 @@ from gravitino.dto.responses.partition_name_list_response
import (
PartitionNameListResponse,
)
from gravitino.dto.responses.partition_response import PartitionResponse
+from gravitino.dto.responses.table_response import TableResponse
from gravitino.exceptions.base import IllegalArgumentException
@@ -67,6 +68,133 @@ class TestResponses(unittest.TestCase):
}
}
"""
+ cls.TABLE_JSON_STRING = """
+ {
+ "name": "example_table",
+ "comment": "This is an example table",
+ "audit": {
+ "creator": "anonymous",
+ "createTime":"2025-10-10T00:00:00"
+ },
+ "columns": [
+ {
+ "name": "id",
+ "type": "integer",
+ "comment": "id column comment",
+ "nullable": false,
+ "autoIncrement": true,
+ "defaultValue": {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "-1"
+ }
+ },
+ {
+ "name": "name",
+ "type": "varchar(500)",
+ "comment": "name column comment",
+ "nullable": true,
+ "autoIncrement": false,
+ "defaultValue": {
+ "type": "literal",
+ "dataType": "null",
+ "value": "null"
+ }
+ },
+ {
+ "name": "StartingDate",
+ "type": "timestamp",
+ "comment": "StartingDate column comment",
+ "nullable": false,
+ "autoIncrement": false,
+ "defaultValue": {
+ "type": "function",
+ "funcName": "current_timestamp",
+ "funcArgs": []
+ }
+ },
+ {
+ "name": "info",
+ "type": {
+ "type": "struct",
+ "fields": [
+ {
+ "name": "position",
+ "type": "string",
+ "nullable": true,
+ "comment": "position field comment"
+ },
+ {
+ "name": "contact",
+ "type": {
+ "type": "list",
+ "elementType": "integer",
+ "containsNull": false
+ },
+ "nullable": true,
+ "comment": "contact field comment"
+ },
+ {
+ "name": "rating",
+ "type": {
+ "type": "map",
+ "keyType": "string",
+ "valueType": "integer",
+ "valueContainsNull": false
+ },
+ "nullable": true,
+ "comment": "rating field comment"
+ }
+ ]
+ },
+ "comment": "info column comment",
+ "nullable": true
+ },
+ {
+ "name": "dt",
+ "type": "date",
+ "comment": "dt column comment",
+ "nullable": true
+ }
+ ],
+ "partitioning": [
+ {
+ "strategy": "identity",
+ "fieldName": [ "dt" ]
+ }
+ ],
+ "distribution": {
+ "strategy": "hash",
+ "number": 32,
+ "funcArgs": [
+ {
+ "type": "field",
+ "fieldName": [ "id" ]
+ }
+ ]
+ },
+ "sortOrders": [
+ {
+ "sortTerm": {
+ "type": "field",
+ "fieldName": [ "id" ]
+ },
+ "direction": "asc",
+ "nullOrdering": "nulls_first"
+ }
+ ],
+ "indexes": [
+ {
+ "indexType": "primary_key",
+ "name": "PRIMARY",
+ "fieldNames": [["id"]]
+ }
+ ],
+ "properties": {
+ "format": "ORC"
+ }
+ }
+ """
def test_file_location_response(self):
json_data = {"code": 0, "fileLocation": "file:/test/1"}
@@ -361,3 +489,13 @@ class TestResponses(unittest.TestCase):
resp: PartitionListResponse =
PartitionListResponse.from_json(json_string)
resp.validate()
self.assertListEqual(resp.get_partitions(), partitions)
+
+ def test_table_response(self):
+ json_string = f"""
+ {{
+ "code": 0,
+ "table": {TestResponses.TABLE_JSON_STRING}
+ }}
+ """
+ resp: TableResponse = TableResponse.from_json(json_string)
+ resp.validate()