kaxil commented on code in PR #59711:
URL: https://github.com/apache/airflow/pull/59711#discussion_r2650977380
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -417,15 +426,14 @@ def handle_event_submit(event: TriggerEvent, *,
task_instance: TaskInstance, ses
:param task_instance: The task instance to handle the submit event for.
:param session: The session to be used for the database callback sink.
"""
+ from airflow.sdk.serde import serialize
from airflow.utils.state import TaskInstanceState
# Get the next kwargs of the task instance, or an empty dictionary if it
doesn't exist
next_kwargs = task_instance.next_kwargs or {}
- # Add the event's payload into the kwargs for the task
- next_kwargs["event"] = event.payload
-
# Update the next kwargs of the task instance
+ next_kwargs["event"] = serialize(event.payload)
Review Comment:
If I read the path correctly, it now serializes `event.payload` with serde
before adding it to `next_kwargs`. However, `next_kwargs` might already contain
data in the old BaseSerialization format (for tasks deferred before upgrade)?
Example scenario I am thinking that is worth validating:
1. Task deferred before upgrade has next_kwargs with keys in
BaseSerialization format
2. Trigger fires after upgrade, adds "event" key in serde format to the dict
3. Worker tries to deserialize - gets mixed format dict
4. Some keys deserialize correctly, others fail or return wrong types
---
maybe you might want to do the same compat shim? Unsure but worth validating
```py
try:
next_kwargs = deserialize(next_kwargs_raw) if next_kwargs_raw else {}
except (ImportError, KeyError, AttributeError, TypeError):
from airflow.serialization.serialized_objects import
BaseSerialization
next_kwargs = BaseSerialization.deserialize(next_kwargs_raw) if
next_kwargs_raw else {}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]