This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 18caee14972 Fix trigger datetime deserialization (#67795)
18caee14972 is described below
commit 18caee1497217ef2a0dd0b42bd8a05cb690459aa
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Fri Jun 5 21:52:37 2026 +0800
Fix trigger datetime deserialization (#67795)
* Fix trigger datetime deserialization
* Fix test_encoders unit test
---
airflow-core/tests/unit/models/test_trigger.py | 21 +++++++++++++++++++++
.../tests/unit/serialization/test_encoders.py | 17 ++---------------
.../src/airflow_shared/serialization/__init__.py | 1 +
.../src/airflow/sdk/serde/serializers/datetime.py | 6 ++++++
task-sdk/tests/task_sdk/serde/test_serializers.py | 18 ++++++++++++++++++
5 files changed, 48 insertions(+), 15 deletions(-)
diff --git a/airflow-core/tests/unit/models/test_trigger.py
b/airflow-core/tests/unit/models/test_trigger.py
index 04998f12e0a..1243a9112f9 100644
--- a/airflow-core/tests/unit/models/test_trigger.py
+++ b/airflow-core/tests/unit/models/test_trigger.py
@@ -37,8 +37,10 @@ from airflow.models.callback import Callback,
TriggererCallback
from airflow.models.xcom import XComModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.callback import AsyncCallback
+from airflow.serialization.encoders import encode_trigger
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.triggers.base import (
+ BaseEventTrigger,
BaseTrigger,
TaskFailedEvent,
TaskSkippedEvent,
@@ -932,6 +934,25 @@ def test_kwargs_not_encrypted():
assert trigger.kwargs["param2"] == "value2"
+def test_decrypt_kwargs_roundtrips_datetime():
+ """
+ A datetime kwarg encoded via BaseSerialization (the asset-watcher path)
must survive
+ the encrypt/decrypt round-trip through serde without crashing, and the
DAG-side and
+ DB-side trigger hashes must match so the trigger is not needlessly
recreated.
+
+ Regression: serde lacked a legacy-compat mapping for the bare ``datetime``
timestamp,
+ so ``_decrypt_kwargs`` raised and the asset-watcher trigger could not be
read back.
+ """
+ classpath = "airflow.providers.standard.triggers.temporal.DateTimeTrigger"
+ moment = datetime.datetime(2026, 1, 15, 12, 30,
tzinfo=datetime.timezone.utc)
+
+ dag_kwargs = encode_trigger({"classpath": classpath, "kwargs": {"moment":
moment}})["kwargs"]
+ decrypted = Trigger._decrypt_kwargs(Trigger.encrypt_kwargs(dag_kwargs))
+
+ assert decrypted["moment"].timestamp() == moment.timestamp()
+ assert BaseEventTrigger.hash(classpath, dag_kwargs) ==
BaseEventTrigger.hash(classpath, decrypted)
+
+
def test_asset_trigger_unassigned_included(session):
"""Asset triggers with triggerer_id=None are returned."""
asset = AssetModel("test_asset")
diff --git a/airflow-core/tests/unit/serialization/test_encoders.py
b/airflow-core/tests/unit/serialization/test_encoders.py
index d2db4277ce8..a5fa0f0ddb6 100644
--- a/airflow-core/tests/unit/serialization/test_encoders.py
+++ b/airflow-core/tests/unit/serialization/test_encoders.py
@@ -164,19 +164,6 @@ _TRIGGER_PARAMS = [
),
]
-# datetime_kwarg can't round-trip via Trigger.encrypt_kwargs / _decrypt_kwargs
yet:
-# serde lacks the "datetime" qualname mapping. xfail strict=True auto-detects
the fix.
-_DB_ROUND_TRIP_PARAMS = [
- pytest.param(
- *p.values,
- id=p.id,
- marks=pytest.mark.xfail(strict=True, reason="serde missing datetime
classname mapping"),
- )
- if p.id == "datetime_kwarg"
- else p
- for p in _TRIGGER_PARAMS
-]
-
def _assert_fully_serialized(encoded_kwargs: dict[str, Any]) -> None:
"""Assert encoded kwargs are fully JSON-safe and not immediately
re-wrapped.
@@ -308,7 +295,7 @@ class TestTriggerHashConsistency:
session.execute(delete(Trigger))
session.commit()
- @pytest.mark.parametrize("trigger", _DB_ROUND_TRIP_PARAMS)
+ @pytest.mark.parametrize("trigger", _TRIGGER_PARAMS)
def test_hash_matches_after_db_round_trip(self, trigger, session):
"""Hash from DAG-parsed kwargs equals hash from a DB-persisted
Trigger."""
encoded = encode_trigger(trigger)
@@ -335,7 +322,7 @@ class TestTriggerHashConsistency:
assert dag_hash == db_hash
- @pytest.mark.parametrize("trigger", _DB_ROUND_TRIP_PARAMS)
+ @pytest.mark.parametrize("trigger", _TRIGGER_PARAMS)
def test_hash_matches_after_re_encode_and_db_round_trip(self, trigger,
session):
"""Hash stays consistent when encode_trigger output is re-encoded
(deserialized-DAG re-serialization path) before DB storage.
diff --git a/shared/serialization/src/airflow_shared/serialization/__init__.py
b/shared/serialization/src/airflow_shared/serialization/__init__.py
index 824848dc679..03c00157ef2 100644
--- a/shared/serialization/src/airflow_shared/serialization/__init__.py
+++ b/shared/serialization/src/airflow_shared/serialization/__init__.py
@@ -47,4 +47,5 @@ OLD_TYPE_TO_FULL_QUALNAME: dict[str, str] = {
"set": "builtins.set",
"frozenset": "builtins.frozenset",
"timedelta": "datetime.timedelta",
+ "datetime": "datetime.datetime",
}
diff --git a/task-sdk/src/airflow/sdk/serde/serializers/datetime.py
b/task-sdk/src/airflow/sdk/serde/serializers/datetime.py
index 9c21ee115a2..4b471c56ef7 100644
--- a/task-sdk/src/airflow/sdk/serde/serializers/datetime.py
+++ b/task-sdk/src/airflow/sdk/serde/serializers/datetime.py
@@ -95,6 +95,12 @@ def deserialize(cls: type, version: int, data: dict | str)
-> datetime.date | da
if cls is datetime.datetime and isinstance(data, dict):
return datetime.datetime.fromtimestamp(float(data[TIMESTAMP]), tz=tz)
+ if cls is datetime.datetime and isinstance(data, int | float):
+ # Legacy BaseSerialization stored datetimes as a bare UTC timestamp
float
+ # (rather than serde's {timestamp, tz} dict). Round-trip that form so
trigger
+ # kwargs encoded via BaseSerialization can be read back through serde.
+ return datetime.datetime.fromtimestamp(float(data),
tz=datetime.timezone.utc)
+
if cls is DateTime and isinstance(data, dict):
return DateTime.fromtimestamp(float(data[TIMESTAMP]), tz=tz)
diff --git a/task-sdk/tests/task_sdk/serde/test_serializers.py
b/task-sdk/tests/task_sdk/serde/test_serializers.py
index 7935ee2e4c6..7e2ade64ebd 100644
--- a/task-sdk/tests/task_sdk/serde/test_serializers.py
+++ b/task-sdk/tests/task_sdk/serde/test_serializers.py
@@ -146,6 +146,24 @@ class TestSerializers:
if tz_input in ["EDT", "CDT", "MDT", "PDT"]:
assert deserialize(serialize(deserialized_dt)) == deserialized_dt
+ def test_deserialize_legacy_datetime_bare_timestamp(self):
+ """Legacy BaseSerialization stored datetimes as ``{"__type":
"datetime", "__var": <utc ts>}``.
+
+ serde must read that form back (used e.g. for trigger kwargs encoded
via
+ BaseSerialization) and reconstruct a UTC ``datetime`` with the same
instant.
+ """
+ moment = datetime.datetime(2026, 1, 15, 12, 30,
tzinfo=datetime.timezone.utc)
+ legacy = {"__type": "datetime", "__var": moment.timestamp()}
+
+ deserialized = deserialize(legacy)
+
+ assert isinstance(deserialized, datetime.datetime)
+ assert deserialized.timestamp() == moment.timestamp()
+
+ # The same form nested inside a dict (the shape trigger kwargs take).
+ nested = deserialize({"moment": legacy})
+ assert nested["moment"].timestamp() == moment.timestamp()
+
@pytest.mark.parametrize(
("expr", "expected"),
[("1", "1"), ("52e4", "520000"), ("2e0", "2"), ("12e-2", "0.12"),
("12.34", "12.34")],