amoghrajesh commented on code in PR #59711:
URL: https://github.com/apache/airflow/pull/59711#discussion_r2651154011


##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -417,15 +426,14 @@ def handle_event_submit(event: TriggerEvent, *, 
task_instance: TaskInstance, ses
     :param task_instance: The task instance to handle the submit event for.
     :param session: The session to be used for the database callback sink.
     """
+    from airflow.sdk.serde import serialize
     from airflow.utils.state import TaskInstanceState
 
     # Get the next kwargs of the task instance, or an empty dictionary if it 
doesn't exist
     next_kwargs = task_instance.next_kwargs or {}
 
-    # Add the event's payload into the kwargs for the task
-    next_kwargs["event"] = event.payload
-
     # Update the next kwargs of the task instance
+    next_kwargs["event"] = serialize(event.payload)

Review Comment:
   Created this dag:
   
   ```python
   from airflow import DAG
   from airflow.providers.standard.sensors.date_time import DateTimeSensorAsync
   from airflow.sdk import Context
   from datetime import datetime
   import pendulum
   
   
   class MyCustomSensor(DateTimeSensorAsync):
   
       def execute(self, context: Context):
           from airflow.providers.standard.triggers.temporal import 
DateTimeTrigger
   
           target_time = pendulum.parse(self.target_time)
   
           self.defer(
               trigger=DateTimeTrigger(moment=target_time),
               method_name="execute_complete",
               kwargs={
                   "numbers": [1, 2, 3, 4, 5],
                   "numbers_in_tuple": (6, 7, 8, 9, 10),
                   "message": "hello from deferred task",
                   "timestamp": pendulum.now("UTC"),
               },
           )
   
       def execute_complete(self, context: Context, event=None, **kwargs):
           """Log the received data."""
           self.log.info(f"Event: {event}")
           self.log.info(f"Numbers: {kwargs.get('numbers')}")
           self.log.info(f"Numbers in tuple are: 
{kwargs.get('numbers_in_tuple')}")
           self.log.info(f"Message: {kwargs.get('message')}")
           self.log.info(f"Timestamp: {kwargs.get('timestamp')}")
           return True
   
   
   with DAG(
       "defer_datetime",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
   ):
       MyCustomSensor(
           task_id="mytask",
           target_time="""{{ macros.datetime.utcnow() + 
macros.timedelta(minutes=1) }}""",
       )
   
   ```
   
   1. Ran it from main branch (pre upgrade)
   2. DB has this for next_kwargs:
   ```python
   {"__var": {"message": "hello from deferred task", "numbers": [1, 2, 3, 4, 
5], "timestamp": {"__var": 1767018594.460406, "__type": "datetime"}, 
"numbers_in_tuple": {"__var": [6, 7, 8, 9, 10], "__type": "tuple"}}, "__type": 
"dict"}
   ```
   
   Shows that it is serialized by `BaseSerialization`
   
   3. Upgraded airflow without db reset
   4. Added some debugging statements:
   ```diff
   Subject: [PATCH] Adding a fixture for creating connections
   ---
   Index: airflow-core/src/airflow/models/trigger.py
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git a/airflow-core/src/airflow/models/trigger.py 
b/airflow-core/src/airflow/models/trigger.py
   --- a/airflow-core/src/airflow/models/trigger.py     (revision 
46f3f8f5082260fa096af9829918b5287187102f)
   +++ b/airflow-core/src/airflow/models/trigger.py     (date 1767018852363)
   @@ -432,8 +432,13 @@
        # Get the next kwargs of the task instance, or an empty dictionary if 
it doesn't exist
        next_kwargs = task_instance.next_kwargs or {}
    
   +    print("Next kwargs before update:", next_kwargs)
   +
        # Update the next kwargs of the task instance
        next_kwargs["event"] = serialize(event.payload)
   +
   +    print("Next kwargs after update:", next_kwargs)
   +
        task_instance.next_kwargs = next_kwargs
    
        # Remove ourselves as its trigger
   
   ```
   
   And due to this I was able to realise that one deser is missing due to which 
"event" was empty (added at wrong level)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to