wenjin272 commented on PR #63:
URL: https://github.com/apache/flink-agents/pull/63#issuecomment-3086539825

   > It seems we are always trying the pydantic serialization first, and 
fallback to json serialization with custom serializer if the first try fail.
   > 
   > This approach leads to a few issues.
   > 
   > * Inefficiency, for trying to serialize the object twice.
   > * Potential inconsistency. Adding one row-type filed in the nested object 
will entirely change the serializer.
   > 
   > I think the most elegant approach might be making the pyflink `Row` a 
pydantic base model and override `model_dump_json()` for it. However, even we 
make the change in Flink now, it will only be available to future Flink 
versions. Moreover, we'd better not to affect Flink for Flink Agents untill the 
latter is stabilized. Alternatively, we may consider modifying `Row` in Flink 
Agents with some monkey patches, as a temporal solution.
   > 
   > WDYT? @Kavishankarks @wenjin272
   
   I think it make sense to avoid the issues.
   
   Firstly, I try the monkey patches, but it occurs exception
   ```
   TypeError: __bases__ assignment: 'BaseModel' deallocator differs from 
'object'
   ```
   
   Then, I try to find some methods to inject customer serializer to BaseModel 
model_dump_json(). Fortunately, there is exactly a fallback parameters when 
call model_dump_json()
   
   ```
   fallback: A function to call when an unknown value is encountered. If not 
provided,
                   a 
[`PydanticSerializationError`][pydantic_core.PydanticSerializationError] error 
is raised.
   ```
   
   So, I modify the Event code like:
   
   ```
   class Event(BaseModel, ABC, extra="allow"):
       """Base class for all event types in the system. Event allow extra 
properties, but
       these properties are required isinstance of BaseModel, or json 
serializable.
   
       Attributes:
       ----------
       id : UUID
           Unique identifier for the event, automatically generated using uuid4.
       """
       id: UUID = Field(default_factory=uuid4)
       
       @staticmethod
       def __serialize_unknow(field: Any) -> Dict[str, Any]:
           if isinstance(field, Row):
               return {"type": "Row", "values": field._values}
           else:
               err_msg = f"Unable to serialize unknown type: {field.__class__}"
               raise PydanticSerializationError(err_msg)
       
       @override
       def model_dump_json(self, **kwargs: Any) -> str:
           """Override model_dump_json to handle Row objects."""
           return super().model_dump_json(fallback=self.__serialize_unknow)
       
   
       @model_validator(mode='after')
       def validate_extra(self) -> 'Event':
           """Ensure init fields is serializable."""
           self.model_dump_json()
           return self
   
       def __setattr__(self, name: str, value: Any) -> None:
           super().__setattr__(name, value)
           # Ensure added property can be serialized.
           self.model_dump_json()
   ```
   
   And it works
   ```
   def test_inject_super_class_to_row() -> None:
       event = InputEvent(input=Row({"a": 1}))
       print(event.model_dump_json())
   
{"id":"037d5556-4a99-4d5f-a3f4-c7ec12773dc0","input":{"type":"Row","values":[{"a":1}]}}
   ```
   WDYT? @Kavishankarks @xintongsong 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to