This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 6ec2f29423 [#5199] feat(client-python): add partition DTO serdes
(#8128)
6ec2f29423 is described below
commit 6ec2f2942397bf1f2f76da91f6006e1e0265180c
Author: George T. C. Lai <[email protected]>
AuthorDate: Tue Aug 26 16:39:35 2025 +0800
[#5199] feat(client-python): add partition DTO serdes (#8128)
### What changes were proposed in this pull request?
This PR is aimed at implementing the following classes/methods
corresponding to the Java client.
JsonUtils.java
- method `readPartition`
- method `writePartition`
- class `PartitionDTOSerializer`
- class `class PartitionDTODeserializer`
### Why are the changes needed?
We need to support table partitioning, bucketing and sort ordering and
indexes
#5199
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
---------
Signed-off-by: George T. C. Lai <[email protected]>
Co-authored-by: Jerry Shao <[email protected]>
---
.../gravitino/api/types/json_serdes/base.py | 3 +-
.../{partition_dto.py => json_serdes/__init__.py} | 29 --
.../_helper/__init__.py} | 29 --
.../partitions/json_serdes/_helper/serdes_utils.py | 168 +++++++
.../partition_dto_serdes.py} | 35 +-
.../gravitino/dto/rel/partitions/partition_dto.py | 5 +-
clients/client-python/gravitino/utils/serdes.py | 1 +
.../unittests/dto/rel/test_partition_dto_serdes.py | 483 +++++++++++++++++++++
8 files changed, 668 insertions(+), 85 deletions(-)
diff --git a/clients/client-python/gravitino/api/types/json_serdes/base.py
b/clients/client-python/gravitino/api/types/json_serdes/base.py
index 853eeae66e..5dd89a0266 100644
--- a/clients/client-python/gravitino/api/types/json_serdes/base.py
+++ b/clients/client-python/gravitino/api/types/json_serdes/base.py
@@ -22,8 +22,9 @@ from dataclasses_json.core import Json
from gravitino.api.expressions.expression import Expression
from gravitino.api.types.types import Type
+from gravitino.dto.rel.partitions.partition_dto import PartitionDTO
-GravitinoTypeT = TypeVar("GravitinoTypeT", bound=Union[Expression, Type])
+GravitinoTypeT = TypeVar("GravitinoTypeT", bound=Union[Expression, Type,
PartitionDTO])
class JsonSerializable(ABC, Generic[GravitinoTypeT]):
diff --git
a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/__init__.py
similarity index 52%
copy from clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
copy to
clients/client-python/gravitino/dto/rel/partitions/json_serdes/__init__.py
index cbfddd0ab5..13a83393a9 100644
--- a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
+++ b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/__init__.py
@@ -14,32 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-
-from abc import abstractmethod
-from enum import Enum
-
-from gravitino.api.expressions.partitions.partition import Partition
-
-
-class PartitionDTO(Partition):
- """Represents a Partition Data Transfer Object (DTO) that implements the
Partition interface."""
-
- class Type(Enum):
- """Type of the partition."""
-
- RANGE = "range"
- """The range partition type."""
- LIST = "list"
- """The list partition type."""
- IDENTITY = "identity"
- """The identity partition type."""
-
- @abstractmethod
- def type(self) -> Type:
- """Gets the type of the partition.
-
- Returns:
- Type: The type of the partition.
- """
- pass # pragma: no cover
diff --git
a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/__init__.py
similarity index 52%
copy from clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
copy to
clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/__init__.py
index cbfddd0ab5..13a83393a9 100644
--- a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
+++
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/__init__.py
@@ -14,32 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-
-from abc import abstractmethod
-from enum import Enum
-
-from gravitino.api.expressions.partitions.partition import Partition
-
-
-class PartitionDTO(Partition):
- """Represents a Partition Data Transfer Object (DTO) that implements the
Partition interface."""
-
- class Type(Enum):
- """Type of the partition."""
-
- RANGE = "range"
- """The range partition type."""
- LIST = "list"
- """The list partition type."""
- IDENTITY = "identity"
- """The identity partition type."""
-
- @abstractmethod
- def type(self) -> Type:
- """Gets the type of the partition.
-
- Returns:
- Type: The type of the partition.
- """
- pass # pragma: no cover
diff --git
a/clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/serdes_utils.py
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/serdes_utils.py
new file mode 100644
index 0000000000..6c8732a4ad
--- /dev/null
+++
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/_helper/serdes_utils.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from contextlib import suppress
+from typing import Any, Dict, List, cast
+
+from gravitino.dto.rel.expressions.json_serdes._helper.serdes_utils import (
+ SerdesUtils as ExpressionsSerdesUtils,
+)
+from gravitino.dto.rel.expressions.literal_dto import LiteralDTO
+from gravitino.dto.rel.partitions.identity_partition_dto import
IdentityPartitionDTO
+from gravitino.dto.rel.partitions.list_partition_dto import ListPartitionDTO
+from gravitino.dto.rel.partitions.partition_dto import PartitionDTO
+from gravitino.dto.rel.partitions.range_partition_dto import RangePartitionDTO
+from gravitino.exceptions.base import IllegalArgumentException
+from gravitino.utils.precondition import Precondition
+from gravitino.utils.serdes import SerdesUtilsBase
+
+
+class SerdesUtils(SerdesUtilsBase):
+ @classmethod
+ def write_partition(cls, value: PartitionDTO) -> Dict[str, Any]:
+ result = {
+ cls.PARTITION_TYPE: value.type().value,
+ cls.PARTITION_NAME: value.name(),
+ }
+ dto_data = {}
+ dto_type = value.type()
+
+ if dto_type not in PartitionDTO.Type:
+ raise IOError(f"Unknown partition type: {value.type()}")
+
+ if dto_type is PartitionDTO.Type.IDENTITY:
+ dto = cast(IdentityPartitionDTO, value)
+ dto_data[cls.FIELD_NAMES] = dto.field_names()
+ dto_data[cls.IDENTITY_PARTITION_VALUES] = [
+ ExpressionsSerdesUtils.write_function_arg(arg=arg)
+ for arg in dto.values()
+ ]
+
+ if dto_type is PartitionDTO.Type.LIST:
+ dto = cast(ListPartitionDTO, value)
+ dto_data[cls.LIST_PARTITION_LISTS] = [
+ [ExpressionsSerdesUtils.write_function_arg(arg=arg) for arg in
args]
+ for args in dto.lists()
+ ]
+
+ if dto_type is PartitionDTO.Type.RANGE:
+ dto = cast(RangePartitionDTO, value)
+ dto_data[cls.RANGE_PARTITION_UPPER] = (
+ ExpressionsSerdesUtils.write_function_arg(arg=dto.upper())
+ )
+ dto_data[cls.RANGE_PARTITION_LOWER] = (
+ ExpressionsSerdesUtils.write_function_arg(arg=dto.lower())
+ )
+
+ dto_data[cls.PARTITION_PROPERTIES] = value.properties()
+ result.update(dto_data)
+ return result
+
+ @classmethod
+ def read_partition(cls, data: Dict[str, Any]) -> PartitionDTO:
+ Precondition.check_argument(
+ isinstance(data, Dict) and len(data) > 0,
+ f"Partition must be a valid JSON object, but found: {data}",
+ )
+ Precondition.check_argument(
+ data.get(cls.PARTITION_TYPE) is not None,
+ f"Partition must have a type field, but found: {data}",
+ )
+ dto_type = None
+ with suppress(ValueError):
+ dto_type = PartitionDTO.Type(data[cls.PARTITION_TYPE])
+
+ if dto_type is PartitionDTO.Type.IDENTITY:
+ Precondition.check_argument(
+ isinstance(data.get(cls.FIELD_NAMES), List),
+ f"Identity partition must have array of fieldNames, but found:
{data}",
+ )
+ Precondition.check_argument(
+ isinstance(data.get(cls.IDENTITY_PARTITION_VALUES), List),
+ f"Identity partition must have array of values, but found:
{data}",
+ )
+ return IdentityPartitionDTO(
+ name=data[cls.PARTITION_NAME],
+ field_names=data[cls.FIELD_NAMES],
+ values=[
+ cast(
+ LiteralDTO,
ExpressionsSerdesUtils.read_function_arg(data=value)
+ )
+ for value in data[cls.IDENTITY_PARTITION_VALUES]
+ ],
+ properties=data.get(cls.PARTITION_PROPERTIES, {}),
+ )
+
+ if dto_type is PartitionDTO.Type.LIST:
+ Precondition.check_argument(
+ cls.PARTITION_NAME in data,
+ f"List partition must have name, but found: {data}",
+ )
+ Precondition.check_argument(
+ isinstance(data.get(cls.LIST_PARTITION_LISTS), List),
+ f"List partition must have array of lists, but found: {data}",
+ )
+ return ListPartitionDTO(
+ name=data[cls.PARTITION_NAME],
+ lists=[
+ [
+ cast(
+ LiteralDTO,
+
ExpressionsSerdesUtils.read_function_arg(data=value),
+ )
+ for value in values
+ ]
+ for values in data[cls.LIST_PARTITION_LISTS]
+ ],
+ properties=data.get(cls.PARTITION_PROPERTIES, {}),
+ )
+
+ if dto_type is PartitionDTO.Type.RANGE:
+ Precondition.check_argument(
+ cls.PARTITION_NAME in data,
+ f"Range partition must have name, but found: {data}",
+ )
+ Precondition.check_argument(
+ cls.RANGE_PARTITION_UPPER in data,
+ f"Range partition must have upper, but found: {data}",
+ )
+ Precondition.check_argument(
+ cls.RANGE_PARTITION_LOWER in data,
+ f"Range partition must have lower, but found: {data}",
+ )
+ upper = cast(
+ LiteralDTO,
+ ExpressionsSerdesUtils.read_function_arg(
+ data[cls.RANGE_PARTITION_UPPER]
+ ),
+ )
+ lower = cast(
+ LiteralDTO,
+ ExpressionsSerdesUtils.read_function_arg(
+ data[cls.RANGE_PARTITION_LOWER]
+ ),
+ )
+ return RangePartitionDTO(
+ name=data[cls.PARTITION_NAME],
+ upper=upper,
+ lower=lower,
+ properties=data.get(cls.PARTITION_PROPERTIES, {}),
+ )
+
+ raise IllegalArgumentException(
+ f"Unknown partition type: {data[cls.PARTITION_TYPE]}"
+ )
diff --git
a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/partition_dto_serdes.py
similarity index 52%
copy from clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
copy to
clients/client-python/gravitino/dto/rel/partitions/json_serdes/partition_dto_serdes.py
index cbfddd0ab5..383ba8d3a9 100644
--- a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
+++
b/clients/client-python/gravitino/dto/rel/partitions/json_serdes/partition_dto_serdes.py
@@ -15,31 +15,18 @@
# specific language governing permissions and limitations
# under the License.
+from typing import Any, Dict
-from abc import abstractmethod
-from enum import Enum
+from gravitino.api.types.json_serdes.base import JsonSerializable
+from gravitino.dto.rel.partitions.json_serdes._helper.serdes_utils import
SerdesUtils
+from gravitino.dto.rel.partitions.partition_dto import PartitionDTO
-from gravitino.api.expressions.partitions.partition import Partition
+class PartitionDTOSerdes(JsonSerializable[PartitionDTO]):
+ @classmethod
+ def serialize(cls, data_type: PartitionDTO) -> Dict[str, Any]:
+ return SerdesUtils.write_partition(data_type)
-class PartitionDTO(Partition):
- """Represents a Partition Data Transfer Object (DTO) that implements the
Partition interface."""
-
- class Type(Enum):
- """Type of the partition."""
-
- RANGE = "range"
- """The range partition type."""
- LIST = "list"
- """The list partition type."""
- IDENTITY = "identity"
- """The identity partition type."""
-
- @abstractmethod
- def type(self) -> Type:
- """Gets the type of the partition.
-
- Returns:
- Type: The type of the partition.
- """
- pass # pragma: no cover
+ @classmethod
+ def deserialize(cls, data: Dict[str, Any]) -> PartitionDTO:
+ return SerdesUtils.read_partition(data)
diff --git
a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
b/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
index cbfddd0ab5..40047d6901 100644
--- a/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
+++ b/clients/client-python/gravitino/dto/rel/partitions/partition_dto.py
@@ -17,7 +17,7 @@
from abc import abstractmethod
-from enum import Enum
+from enum import Enum, unique
from gravitino.api.expressions.partitions.partition import Partition
@@ -25,7 +25,8 @@ from gravitino.api.expressions.partitions.partition import
Partition
class PartitionDTO(Partition):
"""Represents a Partition Data Transfer Object (DTO) that implements the
Partition interface."""
- class Type(Enum):
+ @unique
+ class Type(str, Enum):
"""Type of the partition."""
RANGE = "range"
diff --git a/clients/client-python/gravitino/utils/serdes.py
b/clients/client-python/gravitino/utils/serdes.py
index e67f03edc2..ed3371dbe8 100644
--- a/clients/client-python/gravitino/utils/serdes.py
+++ b/clients/client-python/gravitino/utils/serdes.py
@@ -53,6 +53,7 @@ class SerdesUtilsBase:
PARTITION_TYPE: Final[str] = "type"
PARTITION_NAME: Final[str] = "name"
+ PARTITION_PROPERTIES: Final[str] = "properties"
FIELD_NAMES: Final[str] = "fieldNames"
IDENTITY_PARTITION_VALUES: Final[str] = "values"
LIST_PARTITION_LISTS: Final[str] = "lists"
diff --git
a/clients/client-python/tests/unittests/dto/rel/test_partition_dto_serdes.py
b/clients/client-python/tests/unittests/dto/rel/test_partition_dto_serdes.py
new file mode 100644
index 0000000000..464c705ad8
--- /dev/null
+++ b/clients/client-python/tests/unittests/dto/rel/test_partition_dto_serdes.py
@@ -0,0 +1,483 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import unittest
+from enum import Enum
+from typing import cast
+from unittest.mock import patch
+
+from gravitino.api.types.types import Types
+from gravitino.dto.rel.expressions.json_serdes._helper.serdes_utils import (
+ SerdesUtils as ExpressionSerdesUtils,
+)
+from gravitino.dto.rel.expressions.literal_dto import LiteralDTO
+from gravitino.dto.rel.partitions.identity_partition_dto import
IdentityPartitionDTO
+from gravitino.dto.rel.partitions.json_serdes._helper.serdes_utils import
SerdesUtils
+from gravitino.dto.rel.partitions.json_serdes.partition_dto_serdes import (
+ PartitionDTOSerdes,
+)
+from gravitino.dto.rel.partitions.list_partition_dto import ListPartitionDTO
+from gravitino.dto.rel.partitions.partition_dto import PartitionDTO
+from gravitino.dto.rel.partitions.range_partition_dto import RangePartitionDTO
+from gravitino.exceptions.base import IllegalArgumentException
+
+
+class MockPartitionDTOType(str, Enum):
+ INVALID_PARTITION_TYPE = "invalid_partition_type"
+
+
+class TestPartitionSerdesUtils(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls) -> None:
+ cls.literal_values = {
+ "upper": LiteralDTO.builder()
+ .with_data_type(Types.IntegerType.get())
+ .with_value(value="0")
+ .build(),
+ "lower": LiteralDTO.builder()
+ .with_data_type(Types.IntegerType.get())
+ .with_value(value="100")
+ .build(),
+ }
+ cls.field_names = [["upper"], ["lower"]]
+ cls.properties = {"key1": "value1", "key2": "value2"}
+ cls.identity_partition_dto = IdentityPartitionDTO(
+ name="test_identity_partition",
+ values=list(cls.literal_values.values()),
+ field_names=cls.field_names,
+ properties=cls.properties,
+ )
+ cls.range_partition_dto = RangePartitionDTO(
+ name="test_range_partition",
+ properties=cls.properties,
+ **cls.literal_values,
+ )
+ cls.list_partition_dto = ListPartitionDTO(
+ name="test_list_partition",
+ lists=[[literal_value] for literal_value in
cls.literal_values.values()],
+ properties=cls.properties,
+ )
+ cls.partition_dtos = {
+ PartitionDTO.Type.IDENTITY: cls.identity_partition_dto,
+ PartitionDTO.Type.LIST: cls.list_partition_dto,
+ PartitionDTO.Type.RANGE: cls.range_partition_dto,
+ }
+
+ def test_write_partition_dto_unknown_type(self):
+ """Test that unknown partition types should raise IOError."""
+
+ with patch.object(
+ self.identity_partition_dto,
+ "type",
+ return_value=MockPartitionDTOType.INVALID_PARTITION_TYPE,
+ ):
+ self.assertRaises(
+ IOError,
+ SerdesUtils.write_partition,
+ value=self.identity_partition_dto,
+ )
+
+ @patch(
+
"gravitino.dto.rel.expressions.json_serdes._helper.serdes_utils.SerdesUtils.write_function_arg"
+ )
+ def test_write_partition_with_mocked_expression_serdes(
+ self, mock_write_function_arg
+ ):
+ """Test write_partition with mocked expression serialization.
+
+ To make sure the number of call to method
`ExpressionsSerdesUtils.write_function_arg` are
+ identical to the number of `LiteralDTO`s.
+ """
+
+ for partition_dto in self.partition_dtos.values():
+ mock_write_function_arg.reset_mock()
+ mock_write_function_arg_return = {"mocked": "function_arg"}
+ mock_write_function_arg.return_value =
mock_write_function_arg_return
+ partition_dto_type = partition_dto.type()
+
+ result = SerdesUtils.write_partition(partition_dto)
+
+ self.assertEqual(
+ mock_write_function_arg.call_count, len(self.literal_values)
+ )
+ if partition_dto_type is PartitionDTO.Type.IDENTITY:
+ self.assertEqual(
+ result[SerdesUtils.IDENTITY_PARTITION_VALUES],
+ [mock_write_function_arg_return] *
len(self.literal_values),
+ )
+ if partition_dto_type is PartitionDTO.Type.LIST:
+ self.assertEqual(
+ result[SerdesUtils.LIST_PARTITION_LISTS],
+ [[mock_write_function_arg_return]] *
len(self.literal_values),
+ )
+ if partition_dto_type is PartitionDTO.Type.RANGE:
+ self.assertEqual(
+ [
+ result[SerdesUtils.RANGE_PARTITION_LOWER],
+ result[SerdesUtils.RANGE_PARTITION_UPPER],
+ ],
+ [mock_write_function_arg_return] *
len(self.literal_values),
+ )
+
+ def test_write_partition_dto(self):
+ """Test writing PartitionDTOs"""
+
+ for partition_dto_type, partition_dto in self.partition_dtos.items():
+ result = SerdesUtils.write_partition(partition_dto)
+
+ self.assertEqual(
+ result[SerdesUtils.PARTITION_TYPE], partition_dto_type.value
+ )
+ self.assertEqual(
+ result[SerdesUtils.PARTITION_NAME],
+ f"test_{partition_dto_type.value}_partition",
+ )
+ if partition_dto_type is PartitionDTO.Type.IDENTITY:
+ self.assertEqual(result[SerdesUtils.FIELD_NAMES],
self.field_names)
+ self.assertIn(SerdesUtils.IDENTITY_PARTITION_VALUES, result)
+ self.assertListEqual(
+ result[SerdesUtils.IDENTITY_PARTITION_VALUES],
+ [
+ ExpressionSerdesUtils.write_function_arg(literal_value)
+ for literal_value in self.literal_values.values()
+ ],
+ )
+ if partition_dto_type is PartitionDTO.Type.LIST:
+ self.assertListEqual(
+ result[SerdesUtils.LIST_PARTITION_LISTS],
+ [
+
[ExpressionSerdesUtils.write_function_arg(literal_value)]
+ for literal_value in self.literal_values.values()
+ ],
+ )
+ if partition_dto_type is PartitionDTO.Type.RANGE:
+ self.assertEqual(
+ result[SerdesUtils.RANGE_PARTITION_LOWER],
+ ExpressionSerdesUtils.write_function_arg(
+
arg=self.literal_values[SerdesUtils.RANGE_PARTITION_LOWER]
+ ),
+ )
+ self.assertEqual(
+ result[SerdesUtils.RANGE_PARTITION_UPPER],
+ ExpressionSerdesUtils.write_function_arg(
+
arg=self.literal_values[SerdesUtils.RANGE_PARTITION_UPPER]
+ ),
+ )
+ self.assertDictEqual(
+ result[SerdesUtils.PARTITION_PROPERTIES],
partition_dto.properties()
+ )
+
+ def test_write_partition_empty_values(self):
+ """Test writing partition with empty values."""
+
+ empty_partition = IdentityPartitionDTO(
+ name="empty_partition", values=[], field_names=[], properties={}
+ )
+
+ result = SerdesUtils.write_partition(empty_partition)
+
+ self.assertEqual(result[SerdesUtils.PARTITION_NAME], "empty_partition")
+ self.assertEqual(result[SerdesUtils.FIELD_NAMES], [])
+ self.assertEqual(result[SerdesUtils.IDENTITY_PARTITION_VALUES], [])
+
+ def test_read_partition_dto_invalid_type(self):
+ data = {SerdesUtils.PARTITION_TYPE: "invalid_type"}
+ with self.assertRaises(IllegalArgumentException):
+ SerdesUtils.read_partition(data=data)
+
+ def test_read_partition_dto_invalid_data(self):
+ invalid_json_data = (None, {})
+ for data in invalid_json_data:
+ self.assertRaisesRegex(
+ IllegalArgumentException,
+ "Partition must be a valid JSON object",
+ SerdesUtils.read_partition,
+ data=data,
+ )
+
+ self.assertRaisesRegex(
+ IllegalArgumentException,
+ "Partition must have a type field",
+ SerdesUtils.read_partition,
+ data={"invalid_field": "invalid_value"},
+ )
+
+ def test_read_partition_invalid_identity(self):
+ identity_data_base = {
+ SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.IDENTITY.value
+ }
+ invalid_field_names_data = (
+ identity_data_base,
+ {**identity_data_base, **{SerdesUtils.FIELD_NAMES:
"invalid_field_names"}},
+ )
+ invalid_values_data = (
+ {**identity_data_base, **{SerdesUtils.FIELD_NAMES: []}},
+ {
+ **identity_data_base,
+ **{
+ SerdesUtils.FIELD_NAMES: [],
+ SerdesUtils.IDENTITY_PARTITION_VALUES: "invalid_values",
+ },
+ },
+ )
+ for data in invalid_field_names_data:
+ self.assertRaisesRegex(
+ IllegalArgumentException,
+ "Identity partition must have array of fieldNames",
+ SerdesUtils.read_partition,
+ data=data,
+ )
+ for data in invalid_values_data:
+ self.assertRaisesRegex(
+ IllegalArgumentException,
+ "Identity partition must have array of values",
+ SerdesUtils.read_partition,
+ data=data,
+ )
+
+ def test_read_partition_invalid_list(self):
+ list_data_base = {
+ SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.LIST.value,
+ SerdesUtils.PARTITION_NAME: "list_partition",
+ }
+ invalid_partition_name = {
+ SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.LIST.value
+ }
+ invalid_lists_data = (
+ list_data_base,
+ {**list_data_base, **{SerdesUtils.LIST_PARTITION_LISTS:
"invalid"}},
+ )
+ self.assertRaisesRegex(
+ IllegalArgumentException,
+ "List partition must have name",
+ SerdesUtils.read_partition,
+ data=invalid_partition_name,
+ )
+ for invalid_data in invalid_lists_data:
+ self.assertRaisesRegex(
+ IllegalArgumentException,
+ "List partition must have array of lists",
+ SerdesUtils.read_partition,
+ data=invalid_data,
+ )
+
+ def test_read_partition_invalid_range(self):
+ invalid_partition_name = {
+ SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.RANGE.value
+ }
+ invalid_range_data_upper = {
+ SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.RANGE.value,
+ SerdesUtils.PARTITION_NAME: "range_partition",
+ }
+ invalid_range_data_lower = {
+ SerdesUtils.PARTITION_TYPE: PartitionDTO.Type.RANGE.value,
+ SerdesUtils.PARTITION_NAME: "range_partition",
+ SerdesUtils.RANGE_PARTITION_UPPER: "upper",
+ }
+ self.assertRaisesRegex(
+ IllegalArgumentException,
+ "Range partition must have name",
+ SerdesUtils.read_partition,
+ data=invalid_partition_name,
+ )
+ self.assertRaisesRegex(
+ IllegalArgumentException,
+ "Range partition must have upper",
+ SerdesUtils.read_partition,
+ data=invalid_range_data_upper,
+ )
+ self.assertRaisesRegex(
+ IllegalArgumentException,
+ "Range partition must have lower",
+ SerdesUtils.read_partition,
+ data=invalid_range_data_lower,
+ )
+
+ def test_read_partition_dto(self):
+ for partition_dto_type, partition_dto in self.partition_dtos.items():
+ result = SerdesUtils.write_partition(partition_dto)
+ partition_dto_read = SerdesUtils.read_partition(result)
+
+ self.assertEqual(partition_dto.name(), partition_dto_read.name())
+ self.assertEqual(partition_dto.type(), partition_dto_read.type())
+ if partition_dto_type is PartitionDTO.Type.IDENTITY:
+ dto = cast(IdentityPartitionDTO, partition_dto_read)
+ self.assertListEqual(partition_dto.field_names(),
dto.field_names())
+ self.assertListEqual(partition_dto.values(), dto.values())
+ self.assertEqual(partition_dto.properties(), dto.properties())
+
+ if partition_dto_type is PartitionDTO.Type.LIST:
+ dto = cast(ListPartitionDTO, partition_dto_read)
+ self.assertListEqual(partition_dto.lists(), dto.lists())
+ self.assertEqual(partition_dto.properties(), dto.properties())
+
+ if partition_dto_type is PartitionDTO.Type.RANGE:
+ dto = cast(RangePartitionDTO, partition_dto_read)
+ self.assertEqual(partition_dto.lower(), dto.lower())
+ self.assertEqual(partition_dto.upper(), dto.upper())
+ self.assertEqual(partition_dto.properties(), dto.properties())
+
+ def test_partition_dto_serdes(self):
+ for partition_dto_type, partition_dto in self.partition_dtos.items():
+ serialized_data = PartitionDTOSerdes.serialize(partition_dto)
+ deserialized_partition_dto =
PartitionDTOSerdes.deserialize(serialized_data)
+
+ self.assertEqual(partition_dto.name(),
deserialized_partition_dto.name())
+ self.assertEqual(partition_dto.type(),
deserialized_partition_dto.type())
+ if partition_dto_type is PartitionDTO.Type.IDENTITY:
+ dto = cast(IdentityPartitionDTO, deserialized_partition_dto)
+ self.assertListEqual(partition_dto.field_names(),
dto.field_names())
+ self.assertListEqual(partition_dto.values(), dto.values())
+ self.assertEqual(partition_dto.properties(), dto.properties())
+
+ if partition_dto_type is PartitionDTO.Type.LIST:
+ dto = cast(ListPartitionDTO, deserialized_partition_dto)
+ self.assertListEqual(partition_dto.lists(), dto.lists())
+ self.assertEqual(partition_dto.properties(), dto.properties())
+
+ if partition_dto_type is PartitionDTO.Type.RANGE:
+ dto = cast(RangePartitionDTO, deserialized_partition_dto)
+ self.assertEqual(partition_dto.lower(), dto.lower())
+ self.assertEqual(partition_dto.upper(), dto.upper())
+ self.assertEqual(partition_dto.properties(), dto.properties())
+
+ def test_partition_dto_serdes_identity_from_json_string(self):
+ """Tests deserialize `IdentityPartitionDTO` from JSON string."""
+
+ expected_json_string = """
+ {
+ "type": "identity",
+ "name": "test_identity_partition",
+ "fieldNames": [
+ [
+ "upper"
+ ],
+ [
+ "lower"
+ ]
+ ],
+ "values": [
+ {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "0"
+ },
+ {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "100"
+ }
+ ],
+ "properties": {
+ "key1": "value1",
+ "key2": "value2"
+ }
+ }
+ """
+ expected_serialized = json.loads(expected_json_string)
+ deserialized_dto = PartitionDTOSerdes.deserialize(expected_serialized)
+ self.assertTrue(deserialized_dto == self.identity_partition_dto)
+
+ def test_partition_dto_serdes_list_from_json_string(self):
+ """Tests deserialize `ListPartitionDTO` from JSON string."""
+
+ additional_literal_values = {
+ "upper": LiteralDTO.builder()
+ .with_data_type(Types.IntegerType.get())
+ .with_value(value="101")
+ .build(),
+ "lower": LiteralDTO.builder()
+ .with_data_type(Types.IntegerType.get())
+ .with_value(value="200")
+ .build(),
+ }
+ list_partition_dto = ListPartitionDTO(
+ name="test_list_partition",
+ lists=[
+ list(self.literal_values.values()),
+ list(additional_literal_values.values()),
+ ],
+ properties={},
+ )
+ expected_json_string = """
+ {
+ "type": "list",
+ "name": "test_list_partition",
+ "lists": [
+ [
+ {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "0"
+ },
+ {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "100"
+ }
+ ],
+ [
+ {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "101"
+ },
+ {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "200"
+ }
+ ]
+ ]
+ }
+ """
+ expected_serialized = json.loads(expected_json_string)
+ deserialized_dto = cast(
+ ListPartitionDTO,
PartitionDTOSerdes.deserialize(expected_serialized)
+ )
+ self.assertIs(deserialized_dto.type(), list_partition_dto.type())
+ self.assertEqual(deserialized_dto.name(), list_partition_dto.name())
+ self.assertListEqual(deserialized_dto.lists(),
list_partition_dto.lists())
+ self.assertEqual(deserialized_dto.properties(), {})
+
+ def test_partition_dto_serdes_range_from_json_string(self):
+ """Tests deserialize `RangePartitionDTO` from JSON string."""
+
+ expected_json_string = """
+ {
+ "type": "range",
+ "name": "test_range_partition",
+ "upper": {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "0"
+ },
+ "lower": {
+ "type": "literal",
+ "dataType": "integer",
+ "value": "100"
+ },
+ "properties": {
+ "key1": "value1",
+ "key2": "value2"
+ }
+ }
+ """
+ expected_serialized = json.loads(expected_json_string)
+ deserialized_dto = PartitionDTOSerdes.deserialize(expected_serialized)
+ self.assertTrue(deserialized_dto == self.range_partition_dto)