Copilot commented on code in PR #64714:
URL: https://github.com/apache/airflow/pull/64714#discussion_r3066474762


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1625,7 +1625,10 @@ def defer_task(self, session: Session = NEW_SESSION) -> 
bool:
             assert isinstance(self.task, Operator)
 
         if start_trigger_args := self.start_trigger_args:
+            from airflow.sdk.serde import serialize as serde_serialize
+
             trigger_kwargs = start_trigger_args.trigger_kwargs or {}
+            next_kwargs = serde_serialize(start_trigger_args.next_kwargs or {})

Review Comment:
   The inline import of `serde_serialize` inside `defer_task()` can make 
failures show up later at runtime and may be inconsistent with other imports in 
this module. If this isn’t required to avoid a circular import, prefer moving 
it to the module level; if it is required, add a short comment explaining the 
rationale (e.g., circular dependency avoidance) to make the local import 
intentional.



##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -2729,6 +2729,29 @@ def test_defer_task(create_task_instance):
     assert ti.trigger_timeout is None
 
 
+def test_defer_task_serializes_non_json_next_kwargs(create_task_instance):
+    from airflow.sdk.serde import deserialize
+    from airflow.triggers.base import StartTriggerArgs
+
+    session = mock.Mock(spec=["add", "flush"])
+    delay = datetime.timedelta(minutes=5)
+    start_at = timezone.utcnow()
+    ti = create_task_instance(
+        dag_id="test_defer_task_serializes_non_json_next_kwargs",
+        task_id="test_defer_task_serializes_non_json_next_kwargs_op",
+        start_from_trigger=True,
+        start_trigger_args=StartTriggerArgs(
+            trigger_cls="trigger_cls",
+            next_method="next_method",
+            trigger_kwargs={"key": "value"},
+            next_kwargs={"start_at": start_at, "delay": delay},
+        ),
+    )
+
+    assert ti.defer_task(session=session)
+    assert deserialize(ti.next_kwargs) == {"start_at": start_at, "delay": 
delay}

Review Comment:
   This test currently validates a serde roundtrip, but it doesn’t directly 
assert the regression condition: that `ti.next_kwargs` is JSON-serializable 
after `defer_task()`. Consider adding an assertion that JSON encoding succeeds 
(e.g., `json.dumps(ti.next_kwargs)` does not raise) to ensure non-JSON objects 
were actually converted before persistence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to