This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push: new 71b9414 [Python] Support for Event containing Row field can be json serializable (#63) 71b9414 is described below commit 71b9414379d733a4aef333f02cdae2c7bda19792 Author: Kavishankarks <57652470+kavishanka...@users.noreply.github.com> AuthorDate: Mon Aug 11 15:03:27 2025 +0530 [Python] Support for Event containing Row field can be json serializable (#63) --- python/flink_agents/api/events/event.py | 29 +++++++++-- python/flink_agents/api/tests/test_event.py | 77 ++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 6 deletions(-) diff --git a/python/flink_agents/api/events/event.py b/python/flink_agents/api/events/event.py index 600de78..775f224 100644 --- a/python/flink_agents/api/events/event.py +++ b/python/flink_agents/api/events/event.py @@ -16,10 +16,16 @@ # limitations under the License. ################################################################################# from abc import ABC -from typing import Any +from typing import Any, Dict + +try: + from typing import override +except ImportError: + from typing_extensions import override from uuid import UUID, uuid4 from pydantic import BaseModel, Field, model_validator +from pydantic_core import PydanticSerializationError from pyflink.common import Row @@ -35,13 +41,26 @@ class Event(BaseModel, ABC, extra="allow"): id: UUID = Field(default_factory=uuid4) + @staticmethod + def __serialize_unknown(field: Any) -> Dict[str, Any]: + """Handle serialization of unknown types, specifically Row objects.""" + if isinstance(field, Row): + return {"type": "Row", "values": field._values} + else: + err_msg = f"Unable to serialize unknown type: {field.__class__}" + raise PydanticSerializationError(err_msg) + + @override + def model_dump_json(self, **kwargs: Any) -> str: + """Override model_dump_json to handle Row objects using fallback.""" + # Set fallback if not provided in kwargs + if 'fallback' not in kwargs: + kwargs['fallback'] = self.__serialize_unknown + return super().model_dump_json(**kwargs) + @model_validator(mode="after") def validate_extra(self) -> "Event": """Ensure init fields is serializable.""" - # TODO: support Event contains Row field be json serializable - for value in self.model_dump().values(): - if isinstance(value, Row): - return self self.model_dump_json() return self diff --git a/python/flink_agents/api/tests/test_event.py b/python/flink_agents/api/tests/test_event.py index 00c5d63..a14f1a4 100644 --- a/python/flink_agents/api/tests/test_event.py +++ b/python/flink_agents/api/tests/test_event.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# -from typing import Type +from typing import Any, Type import pytest from pydantic import ValidationError @@ -47,3 +47,78 @@ def test_event_setattr_non_serializable() -> None: # noqa D103 def test_input_event_ignore_row_unserializable() -> None: # noqa D103 InputEvent(input=Row({"a": 1})) + + +def test_event_row_with_non_serializable_fails() -> None: # noqa D103 + with pytest.raises(ValidationError): + Event(row_field=Row({"a": 1}), non_serializable_field=Type[InputEvent]) + + +def test_event_multiple_rows_serializable() -> None: # noqa D103 + Event(row1=Row({"a": 1}), row2=Row({"b": 2}), normal_field="test") + + +def test_event_setattr_row_serializable() -> None: # noqa D103 + event = Event(a=1) + event.row_field = Row({"key": "value"}) + + +def test_event_json_serialization_with_row() -> None: # noqa D103 + event = InputEvent(input=Row({"test": "data"})) + json_str = event.model_dump_json() + assert "test" in json_str + assert "Row" in json_str + + +def test_efficient_row_serialization_with_fallback() -> None: + """Test that the new fallback-based serialization works efficiently.""" + row_data = {"a": 1, "b": "test", "c": [1, 2, 3]} + event = InputEvent(input=Row(row_data)) + + json_str = event.model_dump_json() + import json + parsed = json.loads(json_str) + + assert parsed["input"]["type"] == "Row" + assert parsed["input"]["values"] == [row_data] + assert "id" in parsed # UUID should be present + + def custom_fallback(obj: Any) -> dict[str, Any]: + if isinstance(obj, Row): + return {"custom_type": "CustomRow", "data": obj._values} + msg = "Unknown type" + raise ValueError(msg) + + custom_json = event.model_dump_json(fallback=custom_fallback) + custom_parsed = json.loads(custom_json) + + assert custom_parsed["input"]["custom_type"] == "CustomRow" + assert custom_parsed["input"]["data"] == [row_data] + + +def test_event_with_mixed_serializable_types() -> None: + """Test event with mix of normal and Row types.""" + event = InputEvent(input={ + "normal_data": {"key": "value"}, + "row_data": Row({"test": "data"}), + "list_data": [1, 2, 3], + "nested_row": {"inner": Row({"nested": True})} + }) + + json_str = event.model_dump_json() + + import json + parsed = json.loads(json_str) + + # Normal data should be serialized normally + assert parsed["input"]["normal_data"]["key"] == "value" + assert parsed["input"]["list_data"] == [1, 2, 3] + + # Row data should use fallback serializer + assert parsed["input"]["row_data"]["type"] == "Row" + assert parsed["input"]["nested_row"]["inner"]["type"] == "Row" + + +def test_input_event_ignore_row_unserializable() -> None: # noqa D103 + InputEvent(input=Row({"a": 1})) +