amoghrajesh commented on code in PR #59711:
URL: https://github.com/apache/airflow/pull/59711#discussion_r2651154011
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -417,15 +426,14 @@ def handle_event_submit(event: TriggerEvent, *,
task_instance: TaskInstance, ses
:param task_instance: The task instance to handle the submit event for.
:param session: The session to be used for the database callback sink.
"""
+ from airflow.sdk.serde import serialize
from airflow.utils.state import TaskInstanceState
# Get the next kwargs of the task instance, or an empty dictionary if it
doesn't exist
next_kwargs = task_instance.next_kwargs or {}
- # Add the event's payload into the kwargs for the task
- next_kwargs["event"] = event.payload
-
# Update the next kwargs of the task instance
+ next_kwargs["event"] = serialize(event.payload)
Review Comment:
Created this dag:
```python
from airflow import DAG
from airflow.providers.standard.sensors.date_time import DateTimeSensorAsync
from airflow.sdk import Context
from datetime import datetime
import pendulum
class MyCustomSensor(DateTimeSensorAsync):
def execute(self, context: Context):
from airflow.providers.standard.triggers.temporal import
DateTimeTrigger
target_time = pendulum.parse(self.target_time)
self.defer(
trigger=DateTimeTrigger(moment=target_time),
method_name="execute_complete",
kwargs={
"numbers": [1, 2, 3, 4, 5],
"numbers_in_tuple": (6, 7, 8, 9, 10),
"message": "hello from deferred task",
"timestamp": pendulum.now("UTC"),
},
)
def execute_complete(self, context: Context, event=None, **kwargs):
"""Log the received data."""
self.log.info(f"Event: {event}")
self.log.info(f"Numbers: {kwargs.get('numbers')}")
self.log.info(f"Numbers in tuple are:
{kwargs.get('numbers_in_tuple')}")
self.log.info(f"Message: {kwargs.get('message')}")
self.log.info(f"Timestamp: {kwargs.get('timestamp')}")
return True
with DAG(
"defer_datetime",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
):
MyCustomSensor(
task_id="mytask",
target_time="""{{ macros.datetime.utcnow() +
macros.timedelta(minutes=1) }}""",
)
```
1. Ran it from main branch (pre upgrade)
2. DB has this for next_kwargs:
```python
{"__var": {"message": "hello from deferred task", "numbers": [1, 2, 3, 4,
5], "timestamp": {"__var": 1767018594.460406, "__type": "datetime"},
"numbers_in_tuple": {"__var": [6, 7, 8, 9, 10], "__type": "tuple"}}, "__type":
"dict"}
```
Shows that it is serialized by `BaseSerialization`
3. Upgraded airflow without db reset
4. Added some debugging statements:
```diff
Subject: [PATCH] Adding a fixture for creating connections
---
Index: airflow-core/src/airflow/models/trigger.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/airflow-core/src/airflow/models/trigger.py
b/airflow-core/src/airflow/models/trigger.py
--- a/airflow-core/src/airflow/models/trigger.py (revision
46f3f8f5082260fa096af9829918b5287187102f)
+++ b/airflow-core/src/airflow/models/trigger.py (date 1767018852363)
@@ -432,8 +432,13 @@
# Get the next kwargs of the task instance, or an empty dictionary if
it doesn't exist
next_kwargs = task_instance.next_kwargs or {}
+ print("Next kwargs before update:", next_kwargs)
+
# Update the next kwargs of the task instance
next_kwargs["event"] = serialize(event.payload)
+
+ print("Next kwargs after update:", next_kwargs)
+
task_instance.next_kwargs = next_kwargs
# Remove ourselves as its trigger
```
And due to this I was able to realise that one deser is missing due to which
"event" was empty (added at wrong level)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]