This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 18caee14972 Fix trigger datetime deserialization (#67795)
18caee14972 is described below

commit 18caee1497217ef2a0dd0b42bd8a05cb690459aa
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Fri Jun 5 21:52:37 2026 +0800

    Fix trigger datetime deserialization (#67795)
    
    * Fix trigger datetime deserialization
    
    * Fix test_encoders unit test
---
 airflow-core/tests/unit/models/test_trigger.py      | 21 +++++++++++++++++++++
 .../tests/unit/serialization/test_encoders.py       | 17 ++---------------
 .../src/airflow_shared/serialization/__init__.py    |  1 +
 .../src/airflow/sdk/serde/serializers/datetime.py   |  6 ++++++
 task-sdk/tests/task_sdk/serde/test_serializers.py   | 18 ++++++++++++++++++
 5 files changed, 48 insertions(+), 15 deletions(-)

diff --git a/airflow-core/tests/unit/models/test_trigger.py 
b/airflow-core/tests/unit/models/test_trigger.py
index 04998f12e0a..1243a9112f9 100644
--- a/airflow-core/tests/unit/models/test_trigger.py
+++ b/airflow-core/tests/unit/models/test_trigger.py
@@ -37,8 +37,10 @@ from airflow.models.callback import Callback, 
TriggererCallback
 from airflow.models.xcom import XComModel
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk.definitions.callback import AsyncCallback
+from airflow.serialization.encoders import encode_trigger
 from airflow.serialization.serialized_objects import BaseSerialization
 from airflow.triggers.base import (
+    BaseEventTrigger,
     BaseTrigger,
     TaskFailedEvent,
     TaskSkippedEvent,
@@ -932,6 +934,25 @@ def test_kwargs_not_encrypted():
     assert trigger.kwargs["param2"] == "value2"
 
 
+def test_decrypt_kwargs_roundtrips_datetime():
+    """
+    A datetime kwarg encoded via BaseSerialization (the asset-watcher path) 
must survive
+    the encrypt/decrypt round-trip through serde without crashing, and the 
DAG-side and
+    DB-side trigger hashes must match so the trigger is not needlessly 
recreated.
+
+    Regression: serde lacked a legacy-compat mapping for the bare ``datetime`` 
timestamp,
+    so ``_decrypt_kwargs`` raised and the asset-watcher trigger could not be 
read back.
+    """
+    classpath = "airflow.providers.standard.triggers.temporal.DateTimeTrigger"
+    moment = datetime.datetime(2026, 1, 15, 12, 30, 
tzinfo=datetime.timezone.utc)
+
+    dag_kwargs = encode_trigger({"classpath": classpath, "kwargs": {"moment": 
moment}})["kwargs"]
+    decrypted = Trigger._decrypt_kwargs(Trigger.encrypt_kwargs(dag_kwargs))
+
+    assert decrypted["moment"].timestamp() == moment.timestamp()
+    assert BaseEventTrigger.hash(classpath, dag_kwargs) == 
BaseEventTrigger.hash(classpath, decrypted)
+
+
 def test_asset_trigger_unassigned_included(session):
     """Asset triggers with triggerer_id=None are returned."""
     asset = AssetModel("test_asset")
diff --git a/airflow-core/tests/unit/serialization/test_encoders.py 
b/airflow-core/tests/unit/serialization/test_encoders.py
index d2db4277ce8..a5fa0f0ddb6 100644
--- a/airflow-core/tests/unit/serialization/test_encoders.py
+++ b/airflow-core/tests/unit/serialization/test_encoders.py
@@ -164,19 +164,6 @@ _TRIGGER_PARAMS = [
     ),
 ]
 
-# datetime_kwarg can't round-trip via Trigger.encrypt_kwargs / _decrypt_kwargs 
yet:
-# serde lacks the "datetime" qualname mapping. xfail strict=True auto-detects 
the fix.
-_DB_ROUND_TRIP_PARAMS = [
-    pytest.param(
-        *p.values,
-        id=p.id,
-        marks=pytest.mark.xfail(strict=True, reason="serde missing datetime 
classname mapping"),
-    )
-    if p.id == "datetime_kwarg"
-    else p
-    for p in _TRIGGER_PARAMS
-]
-
 
 def _assert_fully_serialized(encoded_kwargs: dict[str, Any]) -> None:
     """Assert encoded kwargs are fully JSON-safe and not immediately 
re-wrapped.
@@ -308,7 +295,7 @@ class TestTriggerHashConsistency:
         session.execute(delete(Trigger))
         session.commit()
 
-    @pytest.mark.parametrize("trigger", _DB_ROUND_TRIP_PARAMS)
+    @pytest.mark.parametrize("trigger", _TRIGGER_PARAMS)
     def test_hash_matches_after_db_round_trip(self, trigger, session):
         """Hash from DAG-parsed kwargs equals hash from a DB-persisted 
Trigger."""
         encoded = encode_trigger(trigger)
@@ -335,7 +322,7 @@ class TestTriggerHashConsistency:
 
         assert dag_hash == db_hash
 
-    @pytest.mark.parametrize("trigger", _DB_ROUND_TRIP_PARAMS)
+    @pytest.mark.parametrize("trigger", _TRIGGER_PARAMS)
     def test_hash_matches_after_re_encode_and_db_round_trip(self, trigger, 
session):
         """Hash stays consistent when encode_trigger output is re-encoded
         (deserialized-DAG re-serialization path) before DB storage.
diff --git a/shared/serialization/src/airflow_shared/serialization/__init__.py 
b/shared/serialization/src/airflow_shared/serialization/__init__.py
index 824848dc679..03c00157ef2 100644
--- a/shared/serialization/src/airflow_shared/serialization/__init__.py
+++ b/shared/serialization/src/airflow_shared/serialization/__init__.py
@@ -47,4 +47,5 @@ OLD_TYPE_TO_FULL_QUALNAME: dict[str, str] = {
     "set": "builtins.set",
     "frozenset": "builtins.frozenset",
     "timedelta": "datetime.timedelta",
+    "datetime": "datetime.datetime",
 }
diff --git a/task-sdk/src/airflow/sdk/serde/serializers/datetime.py 
b/task-sdk/src/airflow/sdk/serde/serializers/datetime.py
index 9c21ee115a2..4b471c56ef7 100644
--- a/task-sdk/src/airflow/sdk/serde/serializers/datetime.py
+++ b/task-sdk/src/airflow/sdk/serde/serializers/datetime.py
@@ -95,6 +95,12 @@ def deserialize(cls: type, version: int, data: dict | str) 
-> datetime.date | da
     if cls is datetime.datetime and isinstance(data, dict):
         return datetime.datetime.fromtimestamp(float(data[TIMESTAMP]), tz=tz)
 
+    if cls is datetime.datetime and isinstance(data, int | float):
+        # Legacy BaseSerialization stored datetimes as a bare UTC timestamp 
float
+        # (rather than serde's {timestamp, tz} dict). Round-trip that form so 
trigger
+        # kwargs encoded via BaseSerialization can be read back through serde.
+        return datetime.datetime.fromtimestamp(float(data), 
tz=datetime.timezone.utc)
+
     if cls is DateTime and isinstance(data, dict):
         return DateTime.fromtimestamp(float(data[TIMESTAMP]), tz=tz)
 
diff --git a/task-sdk/tests/task_sdk/serde/test_serializers.py 
b/task-sdk/tests/task_sdk/serde/test_serializers.py
index 7935ee2e4c6..7e2ade64ebd 100644
--- a/task-sdk/tests/task_sdk/serde/test_serializers.py
+++ b/task-sdk/tests/task_sdk/serde/test_serializers.py
@@ -146,6 +146,24 @@ class TestSerializers:
         if tz_input in ["EDT", "CDT", "MDT", "PDT"]:
             assert deserialize(serialize(deserialized_dt)) == deserialized_dt
 
+    def test_deserialize_legacy_datetime_bare_timestamp(self):
+        """Legacy BaseSerialization stored datetimes as ``{"__type": 
"datetime", "__var": <utc ts>}``.
+
+        serde must read that form back (used e.g. for trigger kwargs encoded 
via
+        BaseSerialization) and reconstruct a UTC ``datetime`` with the same 
instant.
+        """
+        moment = datetime.datetime(2026, 1, 15, 12, 30, 
tzinfo=datetime.timezone.utc)
+        legacy = {"__type": "datetime", "__var": moment.timestamp()}
+
+        deserialized = deserialize(legacy)
+
+        assert isinstance(deserialized, datetime.datetime)
+        assert deserialized.timestamp() == moment.timestamp()
+
+        # The same form nested inside a dict (the shape trigger kwargs take).
+        nested = deserialize({"moment": legacy})
+        assert nested["moment"].timestamp() == moment.timestamp()
+
     @pytest.mark.parametrize(
         ("expr", "expected"),
         [("1", "1"), ("52e4", "520000"), ("2e0", "2"), ("12e-2", "0.12"), 
("12.34", "12.34")],

Reply via email to