This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ec2f29423 [#5199] feat(client-python): add partition DTO serdes 
(#8128)
6ec2f29423 is described below

commit 6ec2f2942397bf1f2f76da91f6006e1e0265180c
Author: George T. C. Lai <[email protected]>
AuthorDate: Tue Aug 26 16:39:35 2025 +0800

    [#5199] feat(client-python): add partition DTO serdes (#8128)
    
    ### What changes were proposed in this pull request?
    
    This PR is aimed at implementing the following classes/methods
    corresponding to the Java client.
    
    JsonUtils.java
    - method `readPartition`
    - method `writePartition`
    - class `PartitionDTOSerializer`
    - class `class PartitionDTODeserializer`
    
    ### Why are the changes needed?
    
    We need to support table partitioning, bucketing and sort ordering and
    indexes
    
    #5199
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    ---------
    
    Signed-off-by: George T. C. Lai <[email protected]>
    Co-authored-by: Jerry Shao <[email protected]>
---
 .../gravitino/api/types/json_serdes/base.py        |   3 +-
 .../{partition_dto.py => json_serdes/__init__.py}  |  29 --
 .../_helper/__init__.py}                           |  29 --
 .../partitions/json_serdes/_helper/serdes_utils.py | 168 +++++++
 .../partition_dto_serdes.py}                       |  35 +-
 .../gravitino/dto/rel/partitions/partition_dto.py  |   5 +-
 clients/client-python/gravitino/utils/serdes.py    |   1 +
 .../unittests/dto/rel/test_partition_dto_serdes.py | 483 +++++++++++++++++++++
 8 files changed, 668 insertions(+), 85 deletions(-)

diff --git a/clients/client-python/gravitino/api/types/json_serdes/base.py 
b/clients/client-python/gravitino/api/types/json_serdes/base.py
index 853eeae66e..5dd89a0266 100644
--- a/clients/client-python/gravitino/api/types/json_serdes/base.py
+++ b/clients/client-python/gravitino/api/types/json_serdes/base.py
@@ -22,8 +22,9 @@ from dataclasses_json.core import Json
 
 from gravitino.api.expressions.expression import Expression
 from gravitino.api.types.types import Type
+from gravitino.dto.rel.partitions.partition_dto import PartitionDTO
 
-GravitinoTypeT = TypeVar("GravitinoTypeT", bound=Union[Expression, Type])
+GravitinoTypeT = TypeVar("GravitinoTypeT", bound=Union[Expression, Type, 
PartitionDTO])
 
 
 class JsonSerializable(ABC, Generic[GravitinoTypeT]):
diff --git 
a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py 
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/__init__.py
similarity index 52%
copy from clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
copy to 
clients/client-python/gravitino/dto/rel/partitions/json_serdes/__init__.py
index cbfddd0ab5..13a83393a9 100644
--- a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
+++ b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/__init__.py
@@ -14,32 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-
-from abc import abstractmethod
-from enum import Enum
-
-from gravitino.api.expressions.partitions.partition import Partition
-
-
-class PartitionDTO(Partition):
-    """Represents a Partition Data Transfer Object (DTO) that implements the 
Partition interface."""
-
-    class Type(Enum):
-        """Type of the partition."""
-
-        RANGE = "range"
-        """The range partition type."""
-        LIST = "list"
-        """The list partition type."""
-        IDENTITY = "identity"
-        """The identity partition type."""
-
-    @abstractmethod
-    def type(self) -> Type:
-        """Gets the type of the partition.
-
-        Returns:
-            Type: The type of the partition.
-        """
-        pass  # pragma: no cover
diff --git 
a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py 
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/__init__.py
similarity index 52%
copy from clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
copy to 
clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/__init__.py
index cbfddd0ab5..13a83393a9 100644
--- a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
+++ 
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/__init__.py
@@ -14,32 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-
-from abc import abstractmethod
-from enum import Enum
-
-from gravitino.api.expressions.partitions.partition import Partition
-
-
-class PartitionDTO(Partition):
-    """Represents a Partition Data Transfer Object (DTO) that implements the 
Partition interface."""
-
-    class Type(Enum):
-        """Type of the partition."""
-
-        RANGE = "range"
-        """The range partition type."""
-        LIST = "list"
-        """The list partition type."""
-        IDENTITY = "identity"
-        """The identity partition type."""
-
-    @abstractmethod
-    def type(self) -> Type:
-        """Gets the type of the partition.
-
-        Returns:
-            Type: The type of the partition.
-        """
-        pass  # pragma: no cover
diff --git 
a/clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/serdes_utils.py
 
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/serdes_utils.py
new file mode 100644
index 0000000000..6c8732a4ad
--- /dev/null
+++ 
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/serdes_utils.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from contextlib import suppress
+from typing import Any, Dict, List, cast
+
+from gravitino.dto.rel.expressions.json_serdes._helper.serdes_utils import (
+    SerdesUtils as ExpressionsSerdesUtils,
+)
+from gravitino.dto.rel.expressions.literal_dto import LiteralDTO
+from gravitino.dto.rel.partitions.identity_partition_dto import 
IdentityPartitionDTO
+from gravitino.dto.rel.partitions.list_partition_dto import ListPartitionDTO
+from gravitino.dto.rel.partitions.partition_dto import PartitionDTO
+from gravitino.dto.rel.partitions.range_partition_dto import RangePartitionDTO
+from gravitino.exceptions.base import IllegalArgumentException
+from gravitino.utils.precondition import Precondition
+from gravitino.utils.serdes import SerdesUtilsBase
+
+
+class SerdesUtils(SerdesUtilsBase):
+    @classmethod
+    def write_partition(cls, value: PartitionDTO) -> Dict[str, Any]:
+        result = {
+            cls.PARTITION_TYPE: value.type().value,
+            cls.PARTITION_NAME: value.name(),
+        }
+        dto_data = {}
+        dto_type = value.type()
+
+        if dto_type not in PartitionDTO.Type:
+            raise IOError(f"Unknown partition type: {value.type()}")
+
+        if dto_type is PartitionDTO.Type.IDENTITY:
+            dto = cast(IdentityPartitionDTO, value)
+            dto_data[cls.FIELD_NAMES] = dto.field_names()
+            dto_data[cls.IDENTITY_PARTITION_VALUES] = [
+                ExpressionsSerdesUtils.write_function_arg(arg=arg)
+                for arg in dto.values()
+            ]
+
+        if dto_type is PartitionDTO.Type.LIST:
+            dto = cast(ListPartitionDTO, value)
+            dto_data[cls.LIST_PARTITION_LISTS] = [
+                [ExpressionsSerdesUtils.write_function_arg(arg=arg) for arg in 
args]
+                for args in dto.lists()
+            ]
+
+        if dto_type is PartitionDTO.Type.RANGE:
+            dto = cast(RangePartitionDTO, value)
+            dto_data[cls.RANGE_PARTITION_UPPER] = (
+                ExpressionsSerdesUtils.write_function_arg(arg=dto.upper())
+            )
+            dto_data[cls.RANGE_PARTITION_LOWER] = (
+                ExpressionsSerdesUtils.write_function_arg(arg=dto.lower())
+            )
+
+        dto_data[cls.PARTITION_PROPERTIES] = value.properties()
+        result.update(dto_data)
+        return result
+
+    @classmethod
+    def read_partition(cls, data: Dict[str, Any]) -> PartitionDTO:
+        Precondition.check_argument(
+            isinstance(data, Dict) and len(data) > 0,
+            f"Partition must be a valid JSON object, but found: {data}",
+        )
+        Precondition.check_argument(
+            data.get(cls.PARTITION_TYPE) is not None,
+            f"Partition must have a type field, but found: {data}",
+        )
+        dto_type = None
+        with suppress(ValueError):
+            dto_type = PartitionDTO.Type(data[cls.PARTITION_TYPE])
+
+        if dto_type is PartitionDTO.Type.IDENTITY:
+            Precondition.check_argument(
+                isinstance(data.get(cls.FIELD_NAMES), List),
+                f"Identity partition must have array of fieldNames, but found: 
{data}",
+            )
+            Precondition.check_argument(
+                isinstance(data.get(cls.IDENTITY_PARTITION_VALUES), List),
+                f"Identity partition must have array of values, but found: 
{data}",
+            )
+            return IdentityPartitionDTO(
+                name=data[cls.PARTITION_NAME],
+                field_names=data[cls.FIELD_NAMES],
+                values=[
+                    cast(
+                        LiteralDTO, 
ExpressionsSerdesUtils.read_function_arg(data=value)
+                    )
+                    for value in data[cls.IDENTITY_PARTITION_VALUES]
+                ],
+                properties=data.get(cls.PARTITION_PROPERTIES, {}),
+            )
+
+        if dto_type is PartitionDTO.Type.LIST:
+            Precondition.check_argument(
+                cls.PARTITION_NAME in data,
+                f"List partition must have name, but found: {data}",
+            )
+            Precondition.check_argument(
+                isinstance(data.get(cls.LIST_PARTITION_LISTS), List),
+                f"List partition must have array of lists, but found: {data}",
+            )
+            return ListPartitionDTO(
+                name=data[cls.PARTITION_NAME],
+                lists=[
+                    [
+                        cast(
+                            LiteralDTO,
+                            
ExpressionsSerdesUtils.read_function_arg(data=value),
+                        )
+                        for value in values
+                    ]
+                    for values in data[cls.LIST_PARTITION_LISTS]
+                ],
+                properties=data.get(cls.PARTITION_PROPERTIES, {}),
+            )
+
+        if dto_type is PartitionDTO.Type.RANGE:
+            Precondition.check_argument(
+                cls.PARTITION_NAME in data,
+                f"Range partition must have name, but found: {data}",
+            )
+            Precondition.check_argument(
+                cls.RANGE_PARTITION_UPPER in data,
+                f"Range partition must have upper, but found: {data}",
+            )
+            Precondition.check_argument(
+                cls.RANGE_PARTITION_LOWER in data,
+                f"Range partition must have lower, but found: {data}",
+            )
+            upper = cast(
+                LiteralDTO,
+                ExpressionsSerdesUtils.read_function_arg(
+                    data[cls.RANGE_PARTITION_UPPER]
+                ),
+            )
+            lower = cast(
+                LiteralDTO,
+                ExpressionsSerdesUtils.read_function_arg(
+                    data[cls.RANGE_PARTITION_LOWER]
+                ),
+            )
+            return RangePartitionDTO(
+                name=data[cls.PARTITION_NAME],
+                upper=upper,
+                lower=lower,
+                properties=data.get(cls.PARTITION_PROPERTIES, {}),
+            )
+
+        raise IllegalArgumentException(
+            f"Unknown partition type: {data[cls.PARTITION_TYPE]}"
+        )
diff --git 
a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py 
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/partition_dto_serdes.py
similarity index 52%
copy from clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
copy to 
clients/client-python/gravitino/dto/rel/partitions/json_serdes/partition_dto_serdes.py
index cbfddd0ab5..383ba8d3a9 100644
--- a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
+++ 
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/partition_dto_serdes.py
@@ -15,31 +15,18 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from typing import Any, Dict
 
-from abc import abstractmethod
-from enum import Enum
+from gravitino.api.types.json_serdes.base import JsonSerializable
+from gravitino.dto.rel.partitions.json_serdes._helper.serdes_utils import 
SerdesUtils
+from gravitino.dto.rel.partitions.partition_dto import PartitionDTO
 
-from gravitino.api.expressions.partitions.partition import Partition
 
+class PartitionDTOSerdes(JsonSerializable[PartitionDTO]):
+    @classmethod
+    def serialize(cls, data_type: PartitionDTO) -> Dict[str, Any]:
+        return SerdesUtils.write_partition(data_type)
 
-class PartitionDTO(Partition):
-    """Represents a Partition Data Transfer Object (DTO) that implements the 
Partition interface."""
-
-    class Type(Enum):
-        """Type of the partition."""
-
-        RANGE = "range"
-        """The range partition type."""
-        LIST = "list"
-        """The list partition type."""
-        IDENTITY = "identity"
-        """The identity partition type."""
-
-    @abstractmethod
-    def type(self) -> Type:
-        """Gets the type of the partition.
-
-        Returns:
-            Type: The type of the partition.
-        """
-        pass  # pragma: no cover
+    @classmethod
+    def deserialize(cls, data: Dict[str, Any]) -> PartitionDTO:
+        return SerdesUtils.read_partition(data)
diff --git 
a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py 
b/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
index cbfddd0ab5..40047d6901 100644
--- a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
+++ b/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
@@ -17,7 +17,7 @@
 
 
 from abc import abstractmethod
-from enum import Enum
+from enum import Enum, unique
 
 from gravitino.api.expressions.partitions.partition import Partition
 
@@ -25,7 +25,8 @@ from gravitino.api.expressions.partitions.partition import 
Partition
 class PartitionDTO(Partition):
     """Represents a Partition Data Transfer Object (DTO) that implements the 
Partition interface."""
 
-    class Type(Enum):
+    @unique
+    class Type(str, Enum):
         """Type of the partition."""
 
         RANGE = "range"
diff --git a/clients/client-python/gravitino/utils/serdes.py 
b/clients/client-python/gravitino/utils/serdes.py
index e67f03edc2..ed3371dbe8 100644
--- a/clients/client-python/gravitino/utils/serdes.py
+++ b/clients/client-python/gravitino/utils/serdes.py
@@ -53,6 +53,7 @@ class SerdesUtilsBase:
 
     PARTITION_TYPE: Final[str] = "type"
     PARTITION_NAME: Final[str] = "name"
+    PARTITION_PROPERTIES: Final[str] = "properties"
     FIELD_NAMES: Final[str] = "fieldNames"
     IDENTITY_PARTITION_VALUES: Final[str] = "values"
     LIST_PARTITION_LISTS: Final[str] = "lists"
diff --git 
a/clients/client-python/tests/unittests/dto/rel/test_partition_dto_serdes.py 
b/clients/client-python/tests/unittests/dto/rel/test_partition_dto_serdes.py
new file mode 100644
index 0000000000..464c705ad8
--- /dev/null
+++ b/clients/client-python/tests/unittests/dto/rel/test_partition_dto_serdes.py
@@ -0,0 +1,483 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import unittest
+from enum import Enum
+from typing import cast
+from unittest.mock import patch
+
+from gravitino.api.types.types import Types
+from gravitino.dto.rel.expressions.json_serdes._helper.serdes_utils import (
+    SerdesUtils as ExpressionSerdesUtils,
+)
+from gravitino.dto.rel.expressions.literal_dto import LiteralDTO
+from gravitino.dto.rel.partitions.identity_partition_dto import 
IdentityPartitionDTO
+from gravitino.dto.rel.partitions.json_serdes._helper.serdes_utils import 
SerdesUtils
+from gravitino.dto.rel.partitions.json_serdes.partition_dto_serdes import (
+    PartitionDTOSerdes,
+)
+from gravitino.dto.rel.partitions.list_partition_dto import ListPartitionDTO
+from gravitino.dto.rel.partitions.partition_dto import PartitionDTO
+from gravitino.dto.rel.partitions.range_partition_dto import RangePartitionDTO
+from gravitino.exceptions.base import IllegalArgumentException
+
+
+class MockPartitionDTOType(str, Enum):
+    INVALID_PARTITION_TYPE = "invalid_partition_type"
+
+
+class TestPartitionSerdesUtils(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls) -> None:
+        cls.literal_values = {
+            "upper": LiteralDTO.builder()
+            .with_data_type(Types.IntegerType.get())
+            .with_value(value="0")
+            .build(),
+            "lower": LiteralDTO.builder()
+            .with_data_type(Types.IntegerType.get())
+            .with_value(value="100")
+            .build(),
+        }
+        cls.field_names = [["upper"], ["lower"]]
+        cls.properties = {"key1": "value1", "key2": "value2"}
+        cls.identity_partition_dto = IdentityPartitionDTO(
+            name="test_identity_partition",
+            values=list(cls.literal_values.values()),
+            field_names=cls.field_names,
+            properties=cls.properties,
+        )
+        cls.range_partition_dto = RangePartitionDTO(
+            name="test_range_partition",
+            properties=cls.properties,
+            **cls.literal_values,
+        )
+        cls.list_partition_dto = ListPartitionDTO(
+            name="test_list_partition",
+            lists=[[literal_value] for literal_value in 
cls.literal_values.values()],
+            properties=cls.properties,
+        )
+        cls.partition_dtos = {
+            PartitionDTO.Type.IDENTITY: cls.identity_partition_dto,
+            PartitionDTO.Type.LIST: cls.list_partition_dto,
+            PartitionDTO.Type.RANGE: cls.range_partition_dto,
+        }
+
+    def test_write_partition_dto_unknown_type(self):
+        """Test that unknown partition types should raise IOError."""
+
+        with patch.object(
+            self.identity_partition_dto,
+            "type",
+            return_value=MockPartitionDTOType.INVALID_PARTITION_TYPE,
+        ):
+            self.assertRaises(
+                IOError,
+                SerdesUtils.write_partition,
+                value=self.identity_partition_dto,
+            )
+
+    @patch(
+        
"gravitino.dto.rel.expressions.json_serdes._helper.serdes_utils.SerdesUtils.write_function_arg"
+    )
+    def test_write_partition_with_mocked_expression_serdes(
+        self, mock_write_function_arg
+    ):
+        """Test write_partition with mocked expression serialization.
+
+        To make sure the number of call to method 
`ExpressionsSerdesUtils.write_function_arg` are
+        identical to the number of `LiteralDTO`s.
+        """
+
+        for partition_dto in self.partition_dtos.values():
+            mock_write_function_arg.reset_mock()
+            mock_write_function_arg_return = {"mocked": "function_arg"}
+            mock_write_function_arg.return_value = 
mock_write_function_arg_return
+            partition_dto_type = partition_dto.type()
+
+            result = SerdesUtils.write_partition(partition_dto)
+
+            self.assertEqual(
+                mock_write_function_arg.call_count, len(self.literal_values)
+            )
+            if partition_dto_type is PartitionDTO.Type.IDENTITY:
+                self.assertEqual(
+                    result[SerdesUtils.IDENTITY_PARTITION_VALUES],
+                    [mock_write_function_arg_return] * 
len(self.literal_values),
+                )
+            if partition_dto_type is PartitionDTO.Type.LIST:
+                self.assertEqual(
+                    result[SerdesUtils.LIST_PARTITION_LISTS],
+                    [[mock_write_function_arg_return]] * 
len(self.literal_values),
+                )
+            if partition_dto_type is PartitionDTO.Type.RANGE:
+                self.assertEqual(
+                    [
+                        result[SerdesUtils.RANGE_PARTITION_LOWER],
+                        result[SerdesUtils.RANGE_PARTITION_UPPER],
+                    ],
+                    [mock_write_function_arg_return] * 
len(self.literal_values),
+                )
+
+    def test_write_partition_dto(self):
+        """Test writing PartitionDTOs"""
+
+        for partition_dto_type, partition_dto in self.partition_dtos.items():
+            result = SerdesUtils.write_partition(partition_dto)
+
+            self.assertEqual(
+                result[SerdesUtils.PARTITION_TYPE], partition_dto_type.value
+            )
+            self.assertEqual(
+                result[SerdesUtils.PARTITION_NAME],
+                f"test_{partition_dto_type.value}_partition",
+            )
+            if partition_dto_type is PartitionDTO.Type.IDENTITY:
+                self.assertEqual(result[SerdesUtils.FIELD_NAMES], 
self.field_names)
+                self.assertIn(SerdesUtils.IDENTITY_PARTITION_VALUES, result)
+                self.assertListEqual(
+                    result[SerdesUtils.IDENTITY_PARTITION_VALUES],
+                    [
+                        ExpressionSerdesUtils.write_function_arg(literal_value)
+                        for literal_value in self.literal_values.values()
+                    ],
+                )
+            if partition_dto_type is PartitionDTO.Type.LIST:
+                self.assertListEqual(
+                    result[SerdesUtils.LIST_PARTITION_LISTS],
+                    [
+                        
[ExpressionSerdesUtils.write_function_arg(literal_value)]
+                        for literal_value in self.literal_values.values()
+                    ],
+                )
+            if partition_dto_type is PartitionDTO.Type.RANGE:
+                self.assertEqual(
+                    result[SerdesUtils.RANGE_PARTITION_LOWER],
+                    ExpressionSerdesUtils.write_function_arg(
+                        
arg=self.literal_values[SerdesUtils.RANGE_PARTITION_LOWER]
+                    ),
+                )
+                self.assertEqual(
+                    result[SerdesUtils.RANGE_PARTITION_UPPER],
+                    ExpressionSerdesUtils.write_function_arg(
+                        
arg=self.literal_values[SerdesUtils.RANGE_PARTITION_UPPER]
+                    ),
+                )
+            self.assertDictEqual(
+                result[SerdesUtils.PARTITION_PROPERTIES], 
partition_dto.properties()
+            )
+
+    def test_write_partition_empty_values(self):
+        """Test writing partition with empty values."""
+
+        empty_partition = IdentityPartitionDTO(
+            name="empty_partition", values=[], field_names=[], properties={}
+        )
+
+        result = SerdesUtils.write_partition(empty_partition)
+
+        self.assertEqual(result[SerdesUtils.PARTITION_NAME], "empty_partition")
+        self.assertEqual(result[SerdesUtils.FIELD_NAMES], [])
+        self.assertEqual(result[SerdesUtils.IDENTITY_PARTITION_VALUES], [])
+
+    def test_read_partition_dto_invalid_type(self):
+        data = {SerdesUtils.PARTITION_TYPE: "invalid_type"}
+        with self.assertRaises(IllegalArgumentException):
+            SerdesUtils.read_partition(data=data)
+
+    def test_read_partition_dto_invalid_data(self):
+        invalid_json_data = (None, {})
+        for data in invalid_json_data:
+            self.assertRaisesRegex(
+                IllegalArgumentException,
+                "Partition must be a valid JSON object",
+                SerdesUtils.read_partition,
+                data=data,
+            )
+
+        self.assertRaisesRegex(
+            IllegalArgumentException,
+            "Partition must have a type field",
+            SerdesUtils.read_partition,
+            data={"invalid_field": "invalid_value"},
+        )
+
+    def test_read_partition_invalid_identity(self):
+        identity_data_base = {
+            SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.IDENTITY.value
+        }
+        invalid_field_names_data = (
+            identity_data_base,
+            {**identity_data_base, **{SerdesUtils.FIELD_NAMES: 
"invalid_field_names"}},
+        )
+        invalid_values_data = (
+            {**identity_data_base, **{SerdesUtils.FIELD_NAMES: []}},
+            {
+                **identity_data_base,
+                **{
+                    SerdesUtils.FIELD_NAMES: [],
+                    SerdesUtils.IDENTITY_PARTITION_VALUES: "invalid_values",
+                },
+            },
+        )
+        for data in invalid_field_names_data:
+            self.assertRaisesRegex(
+                IllegalArgumentException,
+                "Identity partition must have array of fieldNames",
+                SerdesUtils.read_partition,
+                data=data,
+            )
+        for data in invalid_values_data:
+            self.assertRaisesRegex(
+                IllegalArgumentException,
+                "Identity partition must have array of values",
+                SerdesUtils.read_partition,
+                data=data,
+            )
+
+    def test_read_partition_invalid_list(self):
+        list_data_base = {
+            SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.LIST.value,
+            SerdesUtils.PARTITION_NAME: "list_partition",
+        }
+        invalid_partition_name = {
+            SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.LIST.value
+        }
+        invalid_lists_data = (
+            list_data_base,
+            {**list_data_base, **{SerdesUtils.LIST_PARTITION_LISTS: 
"invalid"}},
+        )
+        self.assertRaisesRegex(
+            IllegalArgumentException,
+            "List partition must have name",
+            SerdesUtils.read_partition,
+            data=invalid_partition_name,
+        )
+        for invalid_data in invalid_lists_data:
+            self.assertRaisesRegex(
+                IllegalArgumentException,
+                "List partition must have array of lists",
+                SerdesUtils.read_partition,
+                data=invalid_data,
+            )
+
+    def test_read_partition_invalid_range(self):
+        invalid_partition_name = {
+            SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.RANGE.value
+        }
+        invalid_range_data_upper = {
+            SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.RANGE.value,
+            SerdesUtils.PARTITION_NAME: "range_partition",
+        }
+        invalid_range_data_lower = {
+            SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.RANGE.value,
+            SerdesUtils.PARTITION_NAME: "range_partition",
+            SerdesUtils.RANGE_PARTITION_UPPER: "upper",
+        }
+        self.assertRaisesRegex(
+            IllegalArgumentException,
+            "Range partition must have name",
+            SerdesUtils.read_partition,
+            data=invalid_partition_name,
+        )
+        self.assertRaisesRegex(
+            IllegalArgumentException,
+            "Range partition must have upper",
+            SerdesUtils.read_partition,
+            data=invalid_range_data_upper,
+        )
+        self.assertRaisesRegex(
+            IllegalArgumentException,
+            "Range partition must have lower",
+            SerdesUtils.read_partition,
+            data=invalid_range_data_lower,
+        )
+
+    def test_read_partition_dto(self):
+        for partition_dto_type, partition_dto in self.partition_dtos.items():
+            result = SerdesUtils.write_partition(partition_dto)
+            partition_dto_read = SerdesUtils.read_partition(result)
+
+            self.assertEqual(partition_dto.name(), partition_dto_read.name())
+            self.assertEqual(partition_dto.type(), partition_dto_read.type())
+            if partition_dto_type is PartitionDTO.Type.IDENTITY:
+                dto = cast(IdentityPartitionDTO, partition_dto_read)
+                self.assertListEqual(partition_dto.field_names(), 
dto.field_names())
+                self.assertListEqual(partition_dto.values(), dto.values())
+                self.assertEqual(partition_dto.properties(), dto.properties())
+
+            if partition_dto_type is PartitionDTO.Type.LIST:
+                dto = cast(ListPartitionDTO, partition_dto_read)
+                self.assertListEqual(partition_dto.lists(), dto.lists())
+                self.assertEqual(partition_dto.properties(), dto.properties())
+
+            if partition_dto_type is PartitionDTO.Type.RANGE:
+                dto = cast(RangePartitionDTO, partition_dto_read)
+                self.assertEqual(partition_dto.lower(), dto.lower())
+                self.assertEqual(partition_dto.upper(), dto.upper())
+                self.assertEqual(partition_dto.properties(), dto.properties())
+
+    def test_partition_dto_serdes(self):
+        for partition_dto_type, partition_dto in self.partition_dtos.items():
+            serialized_data = PartitionDTOSerdes.serialize(partition_dto)
+            deserialized_partition_dto = 
PartitionDTOSerdes.deserialize(serialized_data)
+
+            self.assertEqual(partition_dto.name(), 
deserialized_partition_dto.name())
+            self.assertEqual(partition_dto.type(), 
deserialized_partition_dto.type())
+            if partition_dto_type is PartitionDTO.Type.IDENTITY:
+                dto = cast(IdentityPartitionDTO, deserialized_partition_dto)
+                self.assertListEqual(partition_dto.field_names(), 
dto.field_names())
+                self.assertListEqual(partition_dto.values(), dto.values())
+                self.assertEqual(partition_dto.properties(), dto.properties())
+
+            if partition_dto_type is PartitionDTO.Type.LIST:
+                dto = cast(ListPartitionDTO, deserialized_partition_dto)
+                self.assertListEqual(partition_dto.lists(), dto.lists())
+                self.assertEqual(partition_dto.properties(), dto.properties())
+
+            if partition_dto_type is PartitionDTO.Type.RANGE:
+                dto = cast(RangePartitionDTO, deserialized_partition_dto)
+                self.assertEqual(partition_dto.lower(), dto.lower())
+                self.assertEqual(partition_dto.upper(), dto.upper())
+                self.assertEqual(partition_dto.properties(), dto.properties())
+
+    def test_partition_dto_serdes_identity_from_json_string(self):
+        """Tests deserialize `IdentityPartitionDTO` from JSON string."""
+
+        expected_json_string = """
+            {
+                "type": "identity",
+                "name": "test_identity_partition",
+                "fieldNames": [
+                    [
+                        "upper"
+                    ],
+                    [
+                        "lower"
+                    ]
+                ],
+                "values": [
+                    {
+                        "type": "literal",
+                        "dataType": "integer",
+                        "value": "0"
+                    },
+                    {
+                        "type": "literal",
+                        "dataType": "integer",
+                        "value": "100"
+                    }
+                ],
+                "properties": {
+                    "key1": "value1",
+                    "key2": "value2"
+                }
+            }
+        """
+        expected_serialized = json.loads(expected_json_string)
+        deserialized_dto = PartitionDTOSerdes.deserialize(expected_serialized)
+        self.assertTrue(deserialized_dto == self.identity_partition_dto)
+
+    def test_partition_dto_serdes_list_from_json_string(self):
+        """Tests deserialize `ListPartitionDTO` from JSON string."""
+
+        additional_literal_values = {
+            "upper": LiteralDTO.builder()
+            .with_data_type(Types.IntegerType.get())
+            .with_value(value="101")
+            .build(),
+            "lower": LiteralDTO.builder()
+            .with_data_type(Types.IntegerType.get())
+            .with_value(value="200")
+            .build(),
+        }
+        list_partition_dto = ListPartitionDTO(
+            name="test_list_partition",
+            lists=[
+                list(self.literal_values.values()),
+                list(additional_literal_values.values()),
+            ],
+            properties={},
+        )
+        expected_json_string = """
+            {
+                "type": "list",
+                "name": "test_list_partition",
+                "lists": [
+                    [
+                        {
+                            "type": "literal",
+                            "dataType": "integer",
+                            "value": "0"
+                        },
+                        {
+                            "type": "literal",
+                            "dataType": "integer",
+                            "value": "100"
+                        }
+                    ],
+                    [
+                        {
+                            "type": "literal",
+                            "dataType": "integer",
+                            "value": "101"
+                        },
+                        {
+                            "type": "literal",
+                            "dataType": "integer",
+                            "value": "200"
+                        }
+                    ]
+                ]
+            }
+        """
+        expected_serialized = json.loads(expected_json_string)
+        deserialized_dto = cast(
+            ListPartitionDTO, 
PartitionDTOSerdes.deserialize(expected_serialized)
+        )
+        self.assertIs(deserialized_dto.type(), list_partition_dto.type())
+        self.assertEqual(deserialized_dto.name(), list_partition_dto.name())
+        self.assertListEqual(deserialized_dto.lists(), 
list_partition_dto.lists())
+        self.assertEqual(deserialized_dto.properties(), {})
+
+    def test_partition_dto_serdes_range_from_json_string(self):
+        """Tests deserialize `RangePartitionDTO` from JSON string."""
+
+        expected_json_string = """
+            {
+                "type": "range",
+                "name": "test_range_partition",
+                "upper": {
+                    "type": "literal",
+                    "dataType": "integer",
+                    "value": "0"
+                },
+                "lower": {
+                    "type": "literal",
+                    "dataType": "integer",
+                    "value": "100"
+                },
+                "properties": {
+                    "key1": "value1",
+                    "key2": "value2"
+                }
+            }
+        """
+        expected_serialized = json.loads(expected_json_string)
+        deserialized_dto = PartitionDTOSerdes.deserialize(expected_serialized)
+        self.assertTrue(deserialized_dto == self.range_partition_dto)


Reply via email to