amoghrajesh commented on code in PR #66002:
URL: https://github.com/apache/airflow/pull/66002#discussion_r3194659478
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1658,7 +1658,9 @@ def defer_task(self, session: Session = NEW_SESSION) ->
bool:
self.state = TaskInstanceState.DEFERRED
self.trigger_id = trigger_row.id
self.next_method = start_trigger_args.next_method
- self.next_kwargs = start_trigger_args.next_kwargs or {}
+ from airflow.serialization.enums import stringify_encoding_keys
Review Comment:
Handled in a65df33af9
##########
airflow-core/src/airflow/serialization/enums.py:
##########
@@ -31,6 +32,26 @@ class Encoding(str, Enum):
VAR = "__var"
+def stringify_encoding_keys(d: Any) -> Any:
+ """
+ Convert BaseSerialization Encoding enum keys to their string values
recursively.
+
+ Python 3.10 compatibility: str(Encoding.TYPE) returns "Encoding.TYPE" on
3.10
+ instead of "__type__" (3.10 is still the default CI target).
serde.serialize
+ uses str(k) for dict keys, so without this conversion the encrypted blob
ends up
+ with "Encoding.TYPE" keys that neither serde._convert nor the
BaseSerialization
+ fallback can read back.
+ """
+ if isinstance(d, dict):
+ return {
+ (k.value if isinstance(k, Encoding) else str(k)):
stringify_encoding_keys(v) for k, v in d.items()
+ }
+ if isinstance(d, (list, tuple)):
Review Comment:
Handled in
https://github.com/apache/airflow/commit/a65df33af92b10ae49f3841635f0cc034bed8dc5
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -2295,6 +2295,41 @@ def execute_complete(self):
assert ti.state == TaskInstanceState.DEFERRED
[email protected]_serialized_dag
+def test_schedule_tis_start_trigger_next_kwargs_round_trip(dag_maker, session):
+ """next_kwargs with encoded values (timedelta) must survive the defer_task
round-trip."""
+ import datetime
+
+ from airflow.sdk.serde import deserialize
+
+ class TestOperator(BaseOperator):
+ start_trigger_args = StartTriggerArgs(
+ trigger_cls="airflow.triggers.testing.SuccessTrigger",
+ trigger_kwargs={},
+ next_method="execute_complete",
+ next_kwargs={"delay": datetime.timedelta(seconds=30)},
+ timeout=None,
+ )
+ start_from_trigger = True
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ def execute_complete(self):
+ pass
+
+ with dag_maker(session=session):
+ TestOperator(task_id="test_task")
+
+ dr: DagRun = dag_maker.create_dagrun()
+ ti = dr.get_task_instance("test_task")
+ ti.task = dr.dag.get_task("test_task")
+ dr.schedule_tis((ti,), session=session)
+
+ assert ti.state == TaskInstanceState.DEFERRED
+ assert deserialize(ti.next_kwargs) == {"delay":
datetime.timedelta(seconds=30)}
+
+
Review Comment:
Handled in
https://github.com/apache/airflow/commit/a65df33af92b10ae49f3841635f0cc034bed8dc5
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]