wenjin272 commented on PR #63: URL: https://github.com/apache/flink-agents/pull/63#issuecomment-3086539825
> It seems we are always trying the pydantic serialization first, and fallback to json serialization with custom serializer if the first try fail. > > This approach leads to a few issues. > > * Inefficiency, for trying to serialize the object twice. > * Potential inconsistency. Adding one row-type filed in the nested object will entirely change the serializer. > > I think the most elegant approach might be making the pyflink `Row` a pydantic base model and override `model_dump_json()` for it. However, even we make the change in Flink now, it will only be available to future Flink versions. Moreover, we'd better not to affect Flink for Flink Agents untill the latter is stabilized. Alternatively, we may consider modifying `Row` in Flink Agents with some monkey patches, as a temporal solution. > > WDYT? @Kavishankarks @wenjin272 I think it make sense to avoid the issues. Firstly, I try the monkey patches, but it occurs exception ``` TypeError: __bases__ assignment: 'BaseModel' deallocator differs from 'object' ``` Then, I try to find some methods to inject customer serializer to BaseModel model_dump_json(). Fortunately, there is exactly a fallback parameters when call model_dump_json() ``` fallback: A function to call when an unknown value is encountered. If not provided, a [`PydanticSerializationError`][pydantic_core.PydanticSerializationError] error is raised. ``` So, I modify the Event code like: ``` class Event(BaseModel, ABC, extra="allow"): """Base class for all event types in the system. Event allow extra properties, but these properties are required isinstance of BaseModel, or json serializable. Attributes: ---------- id : UUID Unique identifier for the event, automatically generated using uuid4. """ id: UUID = Field(default_factory=uuid4) @staticmethod def __serialize_unknow(field: Any) -> Dict[str, Any]: if isinstance(field, Row): return {"type": "Row", "values": field._values} else: err_msg = f"Unable to serialize unknown type: {field.__class__}" raise PydanticSerializationError(err_msg) @override def model_dump_json(self, **kwargs: Any) -> str: """Override model_dump_json to handle Row objects.""" return super().model_dump_json(fallback=self.__serialize_unknow) @model_validator(mode='after') def validate_extra(self) -> 'Event': """Ensure init fields is serializable.""" self.model_dump_json() return self def __setattr__(self, name: str, value: Any) -> None: super().__setattr__(name, value) # Ensure added property can be serialized. self.model_dump_json() ``` And it works ``` def test_inject_super_class_to_row() -> None: event = InputEvent(input=Row({"a": 1})) print(event.model_dump_json()) {"id":"037d5556-4a99-4d5f-a3f4-c7ec12773dc0","input":{"type":"Row","values":[{"a":1}]}} ``` WDYT? @Kavishankarks @xintongsong -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org