amoghrajesh commented on code in PR #66002:
URL: https://github.com/apache/airflow/pull/66002#discussion_r3194648177
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -2295,6 +2295,41 @@ def execute_complete(self):
assert ti.state == TaskInstanceState.DEFERRED
[email protected]_serialized_dag
+def test_schedule_tis_start_trigger_next_kwargs_round_trip(dag_maker, session):
+ """next_kwargs with encoded values (timedelta) must survive the defer_task
round-trip."""
+ import datetime
+
+ from airflow.sdk.serde import deserialize
+
+ class TestOperator(BaseOperator):
+ start_trigger_args = StartTriggerArgs(
+ trigger_cls="airflow.triggers.testing.SuccessTrigger",
+ trigger_kwargs={},
+ next_method="execute_complete",
+ next_kwargs={"delay": datetime.timedelta(seconds=30)},
+ timeout=None,
+ )
+ start_from_trigger = True
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ def execute_complete(self):
+ pass
+
+ with dag_maker(session=session):
+ TestOperator(task_id="test_task")
+
+ dr: DagRun = dag_maker.create_dagrun()
+ ti = dr.get_task_instance("test_task")
+ ti.task = dr.dag.get_task("test_task")
+ dr.schedule_tis((ti,), session=session)
+
+ assert ti.state == TaskInstanceState.DEFERRED
+ assert deserialize(ti.next_kwargs) == {"delay":
datetime.timedelta(seconds=30)}
+
+
Review Comment:
Added `test_schedule_tis_start_trigger_kwargs_e2e` which does this: goes
from DAG definition -> schedule_tis (scheduler) -> Trigger row ->
trigger_row.kwargs and asserts the `timedelta` round-trips back to the original
Python object.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]