This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new a39624cf31 [#8795] feat(client-python): add from_dto for TableDTO in
class DTOConverters (#8841)
a39624cf31 is described below
commit a39624cf31db71ea4850da1cc1ce4272709351e4
Author: George T. C. Lai <[email protected]>
AuthorDate: Wed Nov 5 14:59:05 2025 +0800
[#8795] feat(client-python): add from_dto for TableDTO in class
DTOConverters (#8841)
### What changes were proposed in this pull request?
This PR is aimed at implementing method `from_dto()` for `TableDTO` in
class `DTOConverters`. The following methods are implemented due to the
dependency.
- `from_dto` for `DistributionDTO`
- `from_dto` for `IndexDTO`
- `from_dto` for `SortOrderDTO`
- `from_dto` for `ColumnDTO`
- `from_dto` for `Partitioning`
- `from_dto` for `TableDTO`
- `from_dtos` for list of `IndexDTO`
- `from_dtos` for list of `SortOrderDTO`
- `from_dtos` for list of `ColumnDTO`
- `from_dtos` for list of `Partitioning`
### Why are the changes needed?
Before being able to implement `RelationalTable`, we need the method
`from_dto()` for `TableDTO` since it is called by `RelationalTable`.
Fix: #8795
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
---------
Signed-off-by: George T. C. Lai <[email protected]>
---
.../api/rel/expressions/transforms/transforms.py | 20 +-
.../gravitino/api/rel/indexes/indexes.py | 6 +
.../gravitino/client/relational_table.py | 79 ++++
.../gravitino/dto/util/dto_converters.py | 228 ++++++++++-
.../tests/unittests/api/rel/test_transforms.py | 2 +-
.../unittests/dto/util/test_dto_converters.py | 444 +++++++++++++++++++++
6 files changed, 772 insertions(+), 7 deletions(-)
diff --git
a/clients/client-python/gravitino/api/rel/expressions/transforms/transforms.py
b/clients/client-python/gravitino/api/rel/expressions/transforms/transforms.py
index a928e241d3..35e4233ea0 100644
---
a/clients/client-python/gravitino/api/rel/expressions/transforms/transforms.py
+++
b/clients/client-python/gravitino/api/rel/expressions/transforms/transforms.py
@@ -241,21 +241,31 @@ class Transforms(Transform):
return Transforms.ApplyTransform(name=name, arguments=arguments)
+ @overload
+ @staticmethod
+ def list(*field_names: List[str]) -> "Transforms.ListTransform": ...
+
+ @overload
@staticmethod
def list(
- *field_names: List[str], assignments: Optional[List[ListPartition]] =
None
- ) -> "Transforms.ListTransform":
- """Create a transform that includes multiple fields in a list.
+ *, field_names: List[List[str]], assignments: List[ListPartition]
+ ) -> "Transforms.ListTransform": ...
+
+ @staticmethod
+ def list(*args, **kwargs) -> "Transforms.ListTransform":
+ """Create a transform that includes multiple fields in a list with
preassigned list partitions.
Args:
- *fields (List[NamedReference]):
- The fields to include in the list
+ field_names (List[str] or List[List[str]]):
+ The field names to include in the list
assignments (Optional[List[ListPartition]]):
The preassigned list partitions
Returns:
Transforms.ListTransform: The created transform
"""
+ field_names = kwargs.get("field_names", args)
+ assignments = kwargs.get("assignments")
return Transforms.ListTransform(
fields=[
NamedReference.field(field_name=field_name)
diff --git a/clients/client-python/gravitino/api/rel/indexes/indexes.py
b/clients/client-python/gravitino/api/rel/indexes/indexes.py
index 7862bc342d..8e118434d5 100644
--- a/clients/client-python/gravitino/api/rel/indexes/indexes.py
+++ b/clients/client-python/gravitino/api/rel/indexes/indexes.py
@@ -35,6 +35,12 @@ class Indexes:
EMPTY_INDEXES: ClassVar[List[Index]] = []
DEFAULT_MYSQL_PRIMARY_KEY_NAME: ClassVar[str] = "PRIMARY"
+ @staticmethod
+ def of(
+ index_type: Index.IndexType, name: Optional[str], field_names:
List[List[str]]
+ ) -> Index:
+ return Indexes.IndexImpl(index_type, name, field_names)
+
@staticmethod
def unique(name: str, field_names: List[List[str]]) -> Index:
return Indexes.IndexImpl(Index.IndexType.UNIQUE_KEY, name, field_names)
diff --git a/clients/client-python/gravitino/client/relational_table.py
b/clients/client-python/gravitino/client/relational_table.py
new file mode 100644
index 0000000000..2d02d5e6a7
--- /dev/null
+++ b/clients/client-python/gravitino/client/relational_table.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from gravitino.api.audit import Audit
+from gravitino.api.rel.column import Column
+from gravitino.api.rel.expressions.distributions.distribution import
Distribution
+from gravitino.api.rel.expressions.sorts.sort_order import SortOrder
+from gravitino.api.rel.expressions.transforms.transform import Transform
+from gravitino.api.rel.indexes.index import Index
+from gravitino.api.rel.table import Table
+
+
+class RelationalTable(Table): # pylint: disable=too-many-instance-attributes
+ """A generic table implementation."""
+
+ def __init__(
+ self,
+ name: str,
+ columns: list[Column],
+ partitioning: list[Transform],
+ sort_order: list[SortOrder],
+ distribution: Distribution,
+ index: list[Index],
+ comment: Optional[str],
+ properties: dict[str, str],
+ audit_info: Audit,
+ ):
+ self._name = name
+ self._columns = columns
+ self._partitioning = partitioning
+ self._sort_order = sort_order
+ self._distribution = distribution
+ self._index = index
+ self._comment = comment
+ self._properties = properties
+ self._audit_info = audit_info
+
+ def name(self) -> str:
+ return self._name
+
+ def columns(self) -> list[Column]:
+ return self._columns
+
+ def partitioning(self) -> list[Transform]:
+ return self._partitioning
+
+ def sort_order(self) -> list[SortOrder]:
+ return self._sort_order
+
+ def distribution(self) -> Distribution:
+ return self._distribution
+
+ def index(self) -> list[Index]:
+ return self._index
+
+ def comment(self) -> Optional[str]:
+ return self._comment
+
+ def properties(self) -> dict[str, str]:
+ return self._properties
+
+ def audit_info(self) -> Audit:
+ return self._audit_info
diff --git a/clients/client-python/gravitino/dto/util/dto_converters.py
b/clients/client-python/gravitino/dto/util/dto_converters.py
index 5394a01677..e2a781e19a 100644
--- a/clients/client-python/gravitino/dto/util/dto_converters.py
+++ b/clients/client-python/gravitino/dto/util/dto_converters.py
@@ -15,19 +15,51 @@
# specific language governing permissions and limitations
# under the License.
-from typing import cast
+from functools import singledispatchmethod
+from typing import cast, overload
+from gravitino.api.rel.column import Column
+from gravitino.api.rel.expressions.distributions.distribution import
Distribution
+from gravitino.api.rel.expressions.distributions.distributions import
Distributions
from gravitino.api.rel.expressions.expression import Expression
from gravitino.api.rel.expressions.function_expression import
FunctionExpression
from gravitino.api.rel.expressions.literals.literals import Literals
from gravitino.api.rel.expressions.named_reference import NamedReference
+from gravitino.api.rel.expressions.sorts.sort_order import SortOrder
+from gravitino.api.rel.expressions.sorts.sort_orders import SortOrders
+from gravitino.api.rel.expressions.transforms.transform import Transform
+from gravitino.api.rel.expressions.transforms.transforms import Transforms
from gravitino.api.rel.expressions.unparsed_expression import
UnparsedExpression
+from gravitino.api.rel.indexes.index import Index
+from gravitino.api.rel.indexes.indexes import Indexes
+from gravitino.api.rel.partitions.list_partition import ListPartition
+from gravitino.api.rel.partitions.range_partition import RangePartition
+from gravitino.api.rel.table import Table
from gravitino.api.rel.types.types import Types
+from gravitino.client.relational_table import RelationalTable
+from gravitino.dto.rel.column_dto import ColumnDTO
+from gravitino.dto.rel.distribution_dto import DistributionDTO
from gravitino.dto.rel.expressions.field_reference_dto import FieldReferenceDTO
from gravitino.dto.rel.expressions.func_expression_dto import FuncExpressionDTO
from gravitino.dto.rel.expressions.function_arg import FunctionArg
from gravitino.dto.rel.expressions.literal_dto import LiteralDTO
from gravitino.dto.rel.expressions.unparsed_expression_dto import
UnparsedExpressionDTO
+from gravitino.dto.rel.indexes.index_dto import IndexDTO
+from gravitino.dto.rel.partitioning.bucket_partitioning_dto import
BucketPartitioningDTO
+from gravitino.dto.rel.partitioning.function_partitioning_dto import (
+ FunctionPartitioningDTO,
+)
+from gravitino.dto.rel.partitioning.list_partitioning_dto import
ListPartitioningDTO
+from gravitino.dto.rel.partitioning.partitioning import (
+ Partitioning,
+ SingleFieldPartitioning,
+)
+from gravitino.dto.rel.partitioning.range_partitioning_dto import
RangePartitioningDTO
+from gravitino.dto.rel.partitioning.truncate_partitioning_dto import (
+ TruncatePartitioningDTO,
+)
+from gravitino.dto.rel.sort_order_dto import SortOrderDTO
+from gravitino.dto.rel.table_dto import TableDTO
from gravitino.exceptions.base import IllegalArgumentException
@@ -76,3 +108,197 @@ class DTOConverters:
if not args:
return Expression.EMPTY_EXPRESSION
return [DTOConverters.from_function_arg(arg) for arg in args]
+
+ @singledispatchmethod
+ @staticmethod
+ def from_dto(dto) -> object:
+ raise IllegalArgumentException(f"Unsupported DTO type: {type(dto)}")
+
+ @from_dto.register
+ @staticmethod
+ def _(dto: DistributionDTO) -> Distribution:
+ """Converts a DistributionDTO to a Distribution.
+
+ Args:
+ dto (DistributionDTO): The distribution DTO.
+
+ Returns:
+ Distribution: The distribution.
+ """
+ if dto is None or DistributionDTO.NONE.equals(dto):
+ return Distributions.NONE
+
+ return Distributions.of(
+ dto.strategy(),
+ dto.number(),
+ *DTOConverters.from_function_args(dto.args()),
+ )
+
+ @from_dto.register
+ @staticmethod
+ def _(dto: IndexDTO) -> Index:
+ """Converts an IndexDTO to an Index.
+
+ Args:
+ dto (IndexDTO): The Index DTO to be converted.
+
+ Returns:
+ Index: The index.
+ """
+ return Indexes.of(dto.type(), dto.name(), dto.field_names())
+
+ @from_dto.register
+ @staticmethod
+ def _(dto: SortOrderDTO) -> SortOrder:
+ """Converts a SortOrderDTO to a SortOrder.
+
+ Args:
+ dto (SortOrderDTO): The sort order DTO to be converted.
+
+ Returns:
+ SortOrder: The sort order.
+ """
+ return SortOrders.of(
+ DTOConverters.from_function_arg(dto.sort_term()),
+ dto.direction(),
+ dto.null_ordering(),
+ )
+
+ @from_dto.register
+ @staticmethod
+ def _(dto: ColumnDTO) -> Column:
+ """Converts a ColumnDTO to a Column.
+
+ Args:
+ dto (ColumnDTO): The column DTO to be converted.
+
+ Returns:
+ Column: The column.
+ """
+ if dto.default_value() == Column.DEFAULT_VALUE_NOT_SET:
+ return dto
+ return Column.of(
+ dto.name(),
+ dto.data_type(),
+ dto.comment(),
+ dto.nullable(),
+ dto.auto_increment(),
+ DTOConverters.from_function_arg(dto.default_value()),
+ )
+
+ @from_dto.register
+ @staticmethod
+ def _(dto: Partitioning) -> Transform: # pylint:
disable=too-many-return-statements
+ """Converts a partitioning DTO to a Transform.
+
+ Args:
+ dto (Partitioning): The partitioning DTO to be converted.
+
+ Returns:
+ Transform: The transform.
+ """
+ strategy = dto.strategy()
+ if strategy is Partitioning.Strategy.IDENTITY:
+ return Transforms.identity(cast(SingleFieldPartitioning,
dto).field_name())
+ if strategy is Partitioning.Strategy.YEAR:
+ return Transforms.year(cast(SingleFieldPartitioning,
dto).field_name())
+ if strategy is Partitioning.Strategy.MONTH:
+ return Transforms.month(cast(SingleFieldPartitioning,
dto).field_name())
+ if strategy is Partitioning.Strategy.DAY:
+ return Transforms.day(cast(SingleFieldPartitioning,
dto).field_name())
+ if strategy is Partitioning.Strategy.HOUR:
+ return Transforms.hour(cast(SingleFieldPartitioning,
dto).field_name())
+ if strategy is Partitioning.Strategy.BUCKET:
+ bucket_partitioning_dto = cast(BucketPartitioningDTO, dto)
+ return Transforms.bucket(
+ bucket_partitioning_dto.num_buckets(),
+ *bucket_partitioning_dto.field_names(),
+ )
+ if strategy is Partitioning.Strategy.TRUNCATE:
+ truncate_partitioning_dto = cast(TruncatePartitioningDTO, dto)
+ return Transforms.truncate(
+ truncate_partitioning_dto.width(),
+ truncate_partitioning_dto.field_name(),
+ )
+ if strategy is Partitioning.Strategy.LIST:
+ list_partitioning_dto = cast(ListPartitioningDTO, dto)
+ return Transforms.list(
+ field_names=list_partitioning_dto.field_names(),
+ assignments=[
+ cast(ListPartition, DTOConverters.from_dto(p))
+ for p in list_partitioning_dto.assignments()
+ ],
+ )
+ if strategy is Partitioning.Strategy.RANGE:
+ range_partitioning_dto = cast(RangePartitioningDTO, dto)
+ return Transforms.range(
+ range_partitioning_dto.field_name(),
+ [
+ cast(RangePartition, DTOConverters.from_dto(p))
+ for p in range_partitioning_dto.assignments()
+ ],
+ )
+ if strategy is Partitioning.Strategy.FUNCTION:
+ function_partitioning_dto = cast(FunctionPartitioningDTO, dto)
+ return Transforms.apply(
+ function_partitioning_dto.function_name(),
+
DTOConverters.from_function_args(function_partitioning_dto.args()),
+ )
+ raise IllegalArgumentException(f"Unsupported partitioning: {strategy}")
+
+ @from_dto.register
+ @staticmethod
+ def _(dto: TableDTO) -> Table:
+ """Converts a TableDTO to a Table.
+
+ Args:
+ dto (TableDTO): The table DTO to be converted.
+
+ Returns:
+ Table: The table.
+ """
+
+ return RelationalTable(
+ name=dto.name(),
+ columns=DTOConverters.from_dtos(dto.columns()),
+ partitioning=DTOConverters.from_dtos(dto.partitioning()),
+ sort_order=DTOConverters.from_dtos(dto.sort_order()),
+ distribution=DTOConverters.from_dto(dto.distribution()),
+ index=DTOConverters.from_dtos(dto.index()),
+ comment=dto.comment(),
+ properties=dto.properties(),
+ audit_info=dto.audit_info(),
+ )
+
+ @overload
+ @staticmethod
+ def from_dtos(dtos: list[ColumnDTO]) -> list[Column]: ...
+
+ @overload
+ @staticmethod
+ def from_dtos(dtos: list[IndexDTO]) -> list[Index]: ...
+
+ @overload
+ @staticmethod
+ def from_dtos(dtos: list[SortOrderDTO]) -> list[SortOrder]: ...
+
+ @overload
+ @staticmethod
+ def from_dtos(dtos: list[Partitioning]) -> list[Transform]: ...
+
+ @staticmethod
+ def from_dtos(dtos):
+ """Converts list of `ColumnDTO`, `IndexDTO`, `SortOrderDTO`, or
`Partitioning`
+ to the corresponding list of `Column`s, `Index`es, `SortOrder`s, or
`Transform`s.
+
+ Args:
+ dtos (list[ColumnDTO] | list[IndexDTO] | list[SortOrderDTO] |
list[Partitioning]):
+ The DTOs to be converted.
+
+ Returns:
+ list[Column] | list[Index] | list[SortOrder] | list[Transform]:
+ The list of Columns, Indexes, SortOrders, or Transforms
depends on the input DTOs.
+ """
+ if not dtos:
+ return []
+ return [DTOConverters.from_dto(dto) for dto in dtos]
diff --git a/clients/client-python/tests/unittests/api/rel/test_transforms.py
b/clients/client-python/tests/unittests/api/rel/test_transforms.py
index 3694b5a6cf..bf880a2d7d 100644
--- a/clients/client-python/tests/unittests/api/rel/test_transforms.py
+++ b/clients/client-python/tests/unittests/api/rel/test_transforms.py
@@ -153,7 +153,7 @@ class TestTransforms(unittest.TestCase):
def test_list_transform(self):
list_transform = Transforms.list(["createTime"], ["city"])
list_transform_with_assignments = Transforms.list(
- ["createTime", "city"], assignments=[]
+ field_names=[["createTime", "city"]], assignments=[]
)
trans_dict = {
list_transform: 1,
diff --git
a/clients/client-python/tests/unittests/dto/util/test_dto_converters.py
b/clients/client-python/tests/unittests/dto/util/test_dto_converters.py
index a7c685c70b..2fd7a1ec55 100644
--- a/clients/client-python/tests/unittests/dto/util/test_dto_converters.py
+++ b/clients/client-python/tests/unittests/dto/util/test_dto_converters.py
@@ -17,21 +17,54 @@
import unittest
from datetime import datetime
+from itertools import product
from random import randint, random
from typing import cast
from unittest.mock import MagicMock, patch
+from gravitino.api.rel.column import Column
+from gravitino.api.rel.expressions.distributions.distributions import
Distributions
+from gravitino.api.rel.expressions.distributions.strategy import Strategy
from gravitino.api.rel.expressions.expression import Expression
from gravitino.api.rel.expressions.function_expression import
FunctionExpression
from gravitino.api.rel.expressions.literals.literals import Literals
from gravitino.api.rel.expressions.named_reference import FieldReference
+from gravitino.api.rel.expressions.sorts.null_ordering import NullOrdering
+from gravitino.api.rel.expressions.sorts.sort_direction import SortDirection
+from gravitino.api.rel.expressions.sorts.sort_orders import SortOrders
+from gravitino.api.rel.expressions.transforms.transforms import Transforms
from gravitino.api.rel.expressions.unparsed_expression import
UnparsedExpression
+from gravitino.api.rel.indexes.index import Index
+from gravitino.api.rel.indexes.indexes import Indexes
+from gravitino.api.rel.table import Table
from gravitino.api.rel.types.types import Types
+from gravitino.dto.rel.column_dto import ColumnDTO
+from gravitino.dto.rel.distribution_dto import DistributionDTO
from gravitino.dto.rel.expressions.field_reference_dto import FieldReferenceDTO
from gravitino.dto.rel.expressions.func_expression_dto import FuncExpressionDTO
from gravitino.dto.rel.expressions.function_arg import FunctionArg
from gravitino.dto.rel.expressions.literal_dto import LiteralDTO
from gravitino.dto.rel.expressions.unparsed_expression_dto import
UnparsedExpressionDTO
+from gravitino.dto.rel.indexes.index_dto import IndexDTO
+from gravitino.dto.rel.partitioning.bucket_partitioning_dto import
BucketPartitioningDTO
+from gravitino.dto.rel.partitioning.day_partitioning_dto import
DayPartitioningDTO
+from gravitino.dto.rel.partitioning.function_partitioning_dto import (
+ FunctionPartitioningDTO,
+)
+from gravitino.dto.rel.partitioning.hour_partitioning_dto import
HourPartitioningDTO
+from gravitino.dto.rel.partitioning.identity_partitioning_dto import (
+ IdentityPartitioningDTO,
+)
+from gravitino.dto.rel.partitioning.list_partitioning_dto import
ListPartitioningDTO
+from gravitino.dto.rel.partitioning.month_partitioning_dto import
MonthPartitioningDTO
+from gravitino.dto.rel.partitioning.partitioning import Partitioning
+from gravitino.dto.rel.partitioning.range_partitioning_dto import
RangePartitioningDTO
+from gravitino.dto.rel.partitioning.truncate_partitioning_dto import (
+ TruncatePartitioningDTO,
+)
+from gravitino.dto.rel.partitioning.year_partitioning_dto import
YearPartitioningDTO
+from gravitino.dto.rel.sort_order_dto import SortOrderDTO
+from gravitino.dto.rel.table_dto import TableDTO
from gravitino.dto.util.dto_converters import DTOConverters
from gravitino.exceptions.base import IllegalArgumentException
@@ -60,6 +93,133 @@ class TestDTOConverters(unittest.TestCase):
Types.VarCharType.of(10): "test",
Types.FixedCharType.of(10): "test",
}
+ cls.table_dto_json = """
+ {
+ "name": "example_table",
+ "comment": "This is an example table",
+ "audit": {
+ "creator": "Apache Gravitino",
+ "createTime":"2025-10-10T00:00:00"
+ },
+ "columns": [
+ {
+ "name": "id",
+ "type": "integer",
+ "comment": "id column comment",
+ "nullable": false,
+ "autoIncrement": true,
+ "defaultValue": {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "-1"
+ }
+ },
+ {
+ "name": "name",
+ "type": "varchar(500)",
+ "comment": "name column comment",
+ "nullable": true,
+ "autoIncrement": false,
+ "defaultValue": {
+ "type": "literal",
+ "dataType": "null",
+ "value": "null"
+ }
+ },
+ {
+ "name": "StartingDate",
+ "type": "timestamp",
+ "comment": "StartingDate column comment",
+ "nullable": false,
+ "autoIncrement": false,
+ "defaultValue": {
+ "type": "function",
+ "funcName": "current_timestamp",
+ "funcArgs": []
+ }
+ },
+ {
+ "name": "info",
+ "type": {
+ "type": "struct",
+ "fields": [
+ {
+ "name": "position",
+ "type": "string",
+ "nullable": true,
+ "comment": "position field comment"
+ },
+ {
+ "name": "contact",
+ "type": {
+ "type": "list",
+ "elementType": "integer",
+ "containsNull": false
+ },
+ "nullable": true,
+ "comment": "contact field comment"
+ },
+ {
+ "name": "rating",
+ "type": {
+ "type": "map",
+ "keyType": "string",
+ "valueType": "integer",
+ "valueContainsNull": false
+ },
+ "nullable": true,
+ "comment": "rating field comment"
+ }
+ ]
+ },
+ "comment": "info column comment",
+ "nullable": true
+ },
+ {
+ "name": "dt",
+ "type": "date",
+ "comment": "dt column comment",
+ "nullable": true
+ }
+ ],
+ "partitioning": [
+ {
+ "strategy": "identity",
+ "fieldName": [ "dt" ]
+ }
+ ],
+ "distribution": {
+ "strategy": "hash",
+ "number": 32,
+ "funcArgs": [
+ {
+ "type": "field",
+ "fieldName": [ "id" ]
+ }
+ ]
+ },
+ "sortOrders": [
+ {
+ "sortTerm": {
+ "type": "field",
+ "fieldName": [ "age" ]
+ },
+ "direction": "asc",
+ "nullOrdering": "nulls_first"
+ }
+ ],
+ "indexes": [
+ {
+ "indexType": "primary_key",
+ "name": "PRIMARY",
+ "fieldNames": [["id"]]
+ }
+ ],
+ "properties": {
+ "format": "ORC"
+ }
+ }
+ """
def test_from_function_arg_literal_dto(self):
for data_type, value in TestDTOConverters.literals.items():
@@ -146,3 +306,287 @@ class TestDTOConverters(unittest.TestCase):
self.assertListEqual(
DTOConverters.from_function_args([]), Expression.EMPTY_EXPRESSION
)
+
+ def test_from_dto_distribution(self):
+ field_names = [f"field_{i}" for i in range(2)]
+ field_ref_dtos = [
+
FieldReferenceDTO.builder().with_field_name(field_name=[field_name]).build()
+ for field_name in field_names
+ ]
+ distribution_dto = DistributionDTO(
+ strategy=Strategy.HASH, number=4, args=field_ref_dtos
+ )
+ expected = Distributions.of(
+ Strategy.HASH, 4, *DTOConverters.from_function_args(field_ref_dtos)
+ )
+ converted = DTOConverters.from_dto(distribution_dto)
+ self.assertTrue(converted == expected)
+ self.assertTrue(
+ DTOConverters.from_dto(DistributionDTO.NONE) == Distributions.NONE
+ )
+
+ def test_from_dto_index(self):
+ field_names = [[f"field_{i}"] for i in range(2)]
+
+ index_dto = IndexDTO(
+ index_type=Index.IndexType.PRIMARY_KEY,
+ name="PRIMARY",
+ field_names=field_names,
+ )
+ converted = DTOConverters.from_dto(index_dto)
+ expected = Indexes.of(Index.IndexType.PRIMARY_KEY, "PRIMARY",
field_names)
+ self.assertTrue(converted.type() == expected.type())
+ self.assertTrue(converted.name() == expected.name())
+ self.assertListEqual(converted.field_names(), expected.field_names())
+
+ def test_from_dto_raises_exception(self):
+ with self.assertRaisesRegex(IllegalArgumentException, "Unsupported DTO
type"):
+ DTOConverters.from_dto("")
+
+ def test_from_dto_sort_order(self):
+ direction, null_ordering = SortDirection.ASCENDING,
NullOrdering.NULLS_LAST
+ field_ref_dto = (
+
FieldReferenceDTO.builder().with_field_name(field_name=["score"]).build()
+ )
+ sort_order_dto = SortOrderDTO(
+ sort_term=field_ref_dto,
+ direction=direction,
+ null_ordering=null_ordering,
+ )
+ expected = SortOrders.of(
+ expression=DTOConverters.from_function_arg(field_ref_dto),
+ direction=direction,
+ null_ordering=null_ordering,
+ )
+ converted = DTOConverters.from_dto(sort_order_dto)
+ self.assertTrue(converted == expected)
+
+ def test_from_dto_column(self):
+ column_name = "test_column"
+ column_data_type = Types.IntegerType.get()
+
+ # Test for default value not set
+ column_dto = (
+ ColumnDTO.builder()
+ .with_name(column_name)
+ .with_data_type(column_data_type)
+ .build()
+ )
+ converted = DTOConverters.from_dto(column_dto)
+ self.assertTrue(converted.default_value() ==
Column.DEFAULT_VALUE_NOT_SET)
+ self.assertTrue(converted == column_dto)
+
+ column_dto = (
+ ColumnDTO.builder()
+ .with_name(column_name)
+ .with_data_type(column_data_type)
+ .with_default_value(
+ LiteralDTO.builder()
+ .with_data_type(column_data_type)
+ .with_value("1")
+ .build()
+ )
+ .build()
+ )
+ expected = Column.of(
+ name=column_name,
+ data_type=column_data_type,
+ default_value=Literals.of(value="1", data_type=column_data_type),
+ )
+ converted = DTOConverters.from_dto(column_dto)
+ self.assertTrue(converted.default_value() !=
Column.DEFAULT_VALUE_NOT_SET)
+ self.assertTrue(expected.default_value() !=
Column.DEFAULT_VALUE_NOT_SET)
+ self.assertTrue(converted == expected)
+
+ def test_from_dto_partitioning(self):
+ field_name = ["score"]
+ field_names = [field_name]
+ partitioning = {
+ Partitioning.Strategy.IDENTITY:
IdentityPartitioningDTO(*field_name),
+ Partitioning.Strategy.YEAR: YearPartitioningDTO(*field_name),
+ Partitioning.Strategy.MONTH: MonthPartitioningDTO(*field_name),
+ Partitioning.Strategy.DAY: DayPartitioningDTO(*field_name),
+ Partitioning.Strategy.HOUR: HourPartitioningDTO(*field_name),
+ Partitioning.Strategy.BUCKET: BucketPartitioningDTO(10,
*field_names),
+ Partitioning.Strategy.TRUNCATE: TruncatePartitioningDTO(10,
field_name),
+ Partitioning.Strategy.LIST: ListPartitioningDTO([["createTime"],
["city"]]),
+ Partitioning.Strategy.RANGE: RangePartitioningDTO(field_name),
+ Partitioning.Strategy.FUNCTION: FunctionPartitioningDTO(
+ "test_function",
+ LiteralDTO.builder()
+ .with_data_type(Types.IntegerType.get())
+ .with_value("-1")
+ .build(),
+ LiteralDTO.builder()
+ .with_data_type(Types.BooleanType.get())
+ .with_value("True")
+ .build(),
+ ),
+ }
+ transform = {
+ Partitioning.Strategy.IDENTITY: Transforms.identity(field_name),
+ Partitioning.Strategy.YEAR: Transforms.year(field_name),
+ Partitioning.Strategy.MONTH: Transforms.month(field_name),
+ Partitioning.Strategy.DAY: Transforms.day(field_name),
+ Partitioning.Strategy.HOUR: Transforms.hour(field_name),
+ Partitioning.Strategy.BUCKET: Transforms.bucket(10, *field_names),
+ Partitioning.Strategy.TRUNCATE: Transforms.truncate(10,
field_name),
+ Partitioning.Strategy.LIST: Transforms.list(["createTime"],
["city"]),
+ Partitioning.Strategy.RANGE: Transforms.range(field_name),
+ Partitioning.Strategy.FUNCTION: Transforms.apply(
+ "test_function",
+ [
+ Literals.of(value="-1", data_type=Types.IntegerType.get()),
+ Literals.of(value="True",
data_type=Types.BooleanType.get()),
+ ],
+ ),
+ }
+
+ for strategy, dto in partitioning.items():
+ expected = transform[strategy]
+ self.assertTrue(DTOConverters.from_dto(dto) == expected)
+
+ with (
+ patch.object(IdentityPartitioningDTO, "strategy") as mock_strategy,
+ self.assertRaisesRegex(
+ IllegalArgumentException, "Unsupported partitioning"
+ ),
+ ):
+ mock_strategy.return_value = "invalid_strategy"
+ DTOConverters.from_dto(IdentityPartitioningDTO(*field_name))
+
+ def test_from_dtos_index_dto(self):
+ field_names = [[f"field_{i}"] for i in range(2)]
+
+ dtos = [
+ IndexDTO(
+ index_type=index_type,
+ name=index_type.value,
+ field_names=field_names,
+ )
+ for index_type in Index.IndexType
+ ]
+ converted_dtos = DTOConverters.from_dtos(dtos)
+ expected_items = [
+ Indexes.of(index_type, index_type.value, field_names)
+ for index_type in Index.IndexType
+ ]
+ self.assertEqual(len(converted_dtos), len(expected_items))
+ for converted, expected in zip(converted_dtos, expected_items):
+ self.assertTrue(converted.type() == expected.type())
+ self.assertTrue(converted.name() == expected.name())
+ self.assertListEqual(converted.field_names(),
expected.field_names())
+
+ self.assertEqual(DTOConverters.from_dtos([]), [])
+
+ def test_from_dtos_sort_order(self):
+ directions = {SortDirection.ASCENDING, SortDirection.DESCENDING}
+ null_orderings = {NullOrdering.NULLS_LAST, NullOrdering.NULLS_FIRST}
+ field_names = [
+ [f"score_{i}"] for i in range(len(directions) *
len(null_orderings))
+ ]
+ sort_order_dtos = []
+ expected = []
+ for field_name, (direction, null_ordering) in zip(
+ field_names, product(directions, null_orderings)
+ ):
+ field_ref_dto = (
+ FieldReferenceDTO.builder()
+ .with_field_name(field_name=field_name)
+ .build()
+ )
+ sort_order_dtos.append(
+ SortOrderDTO(
+ sort_term=field_ref_dto,
+ direction=direction,
+ null_ordering=null_ordering,
+ )
+ )
+ expected.append(
+ SortOrders.of(
+ expression=DTOConverters.from_function_arg(field_ref_dto),
+ direction=direction,
+ null_ordering=null_ordering,
+ )
+ )
+ converted = DTOConverters.from_dtos(sort_order_dtos)
+ self.assertListEqual(converted, expected)
+
+ def test_from_dtos_partitioning(self):
+ field_name = ["score"]
+ field_names = [field_name]
+ partitioning = [
+ IdentityPartitioningDTO(*field_name),
+ BucketPartitioningDTO(10, *field_names),
+ ]
+ transform = [
+ Transforms.identity(field_name),
+ Transforms.bucket(10, *field_names),
+ ]
+ converted = DTOConverters.from_dtos(partitioning)
+ self.assertListEqual(converted, transform)
+
+ def test_from_dtos_column_dto(self):
+ column_names = {f"column_{i}" for i in range(2)}
+ column_data_types = {Types.IntegerType.get(), Types.BooleanType.get()}
+ column_dtos: list[ColumnDTO] = [
+ ColumnDTO.builder()
+ .with_name(column_name)
+ .with_data_type(column_data_type)
+ .build()
+ for column_name, column_data_type in zip(column_names,
column_data_types)
+ ]
+ self.assertListEqual(DTOConverters.from_dtos(column_dtos), column_dtos)
+
+ def test_from_dtos_column_dto_with_default_value(self):
+ column_names = {f"column_{i}" for i in range(2)}
+ column_data_types = {Types.IntegerType.get(), Types.BooleanType.get()}
+ column_dto_default_values = {
+ LiteralDTO.builder()
+ .with_data_type(Types.IntegerType.get())
+ .with_value("1")
+ .build(),
+ LiteralDTO.builder()
+ .with_data_type(Types.BooleanType.get())
+ .with_value("True")
+ .build(),
+ }
+ column_dtos: list[ColumnDTO] = [
+ ColumnDTO.builder()
+ .with_name(column_name)
+ .with_data_type(column_data_type)
+ .with_default_value(default_value)
+ .build()
+ for column_name, column_data_type, default_value in zip(
+ column_names, column_data_types, column_dto_default_values
+ )
+ ]
+ expected = [DTOConverters.from_dto(dto) for dto in column_dtos]
+ converted = DTOConverters.from_dtos(column_dtos)
+ self.assertListEqual(converted, expected)
+
+ def test_from_dto_table_dto(self):
+ dto = TableDTO.from_json(self.table_dto_json)
+ converted = DTOConverters.from_dto(dto)
+ table = cast(Table, converted)
+ self.assertIsInstance(converted, Table)
+ self.assertEqual(table.name(), dto.name())
+ self.assertEqual(table.comment(), dto.comment())
+ self.assertListEqual(table.columns(),
DTOConverters.from_dtos(dto.columns()))
+ self.assertListEqual(
+ table.partitioning(), DTOConverters.from_dtos(dto.partitioning())
+ )
+ self.assertListEqual(
+ table.sort_order(), DTOConverters.from_dtos(dto.sort_order())
+ )
+ for table_index, dto_index in zip(
+ table.index(), DTOConverters.from_dtos(dto.index())
+ ):
+ self.assertEqual(table_index.name(), dto_index.name())
+ self.assertEqual(table_index.type(), dto_index.type())
+ self.assertListEqual(table_index.field_names(),
dto_index.field_names())
+ self.assertEqual(
+ table.distribution(), DTOConverters.from_dto(dto.distribution())
+ )
+ self.assertEqual(table.audit_info(), dto.audit_info())
+ self.assertEqual(table.properties(), dto.properties())