This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 0e35aa9508f [v3-0-test] Ensure trigger kwargs are properly 
deserialized during trigger execution (#52693) (#52721)
0e35aa9508f is described below

commit 0e35aa9508f37c97b7543d1ab32c63ebb6a136b7
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jul 2 22:43:43 2025 +0530

    [v3-0-test] Ensure trigger kwargs are properly deserialized during trigger 
execution (#52693) (#52721)
    
    (cherry picked from commit 64a5919b992246b4ec6db05025ca1d5f8194ce82)
    
    Co-authored-by: Amogh Desai <[email protected]>
---
 .../src/airflow/jobs/triggerer_job_runner.py       | 12 +++++-
 .../airflow/serialization/serialized_objects.py    | 25 ++++++------
 airflow-core/tests/unit/jobs/test_triggerer_job.py | 45 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 14 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 8d2bdf99795..512f8a4f0e6 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -875,8 +875,16 @@ class TriggerRunner:
             await asyncio.sleep(0)
 
             try:
-                kwargs = Trigger._decrypt_kwargs(workload.encrypted_kwargs)
-                trigger_instance = trigger_class(**kwargs)
+                from airflow.serialization.serialized_objects import 
smart_decode_trigger_kwargs
+
+                # Decrypt and clean trigger kwargs before for execution
+                # Note: We only clean up serialization artifacts (__var, 
__type keys) here,
+                # not in `_decrypt_kwargs` because it is used during hash 
comparison in
+                # add_asset_trigger_references and could lead to adverse 
effects like hash mismatches
+                # that could cause None values in collections.
+                kw = Trigger._decrypt_kwargs(workload.encrypted_kwargs)
+                deserialised_kwargs = {k: smart_decode_trigger_kwargs(v) for 
k, v in kw.items()}
+                trigger_instance = trigger_class(**deserialised_kwargs)
             except TypeError as err:
                 self.log.error("Trigger failed to inflate", error=err)
                 self.failed_triggers.append((trigger_id, err))
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index f3e49c4fabd..e3cf0f019d5 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -337,19 +337,20 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
     raise ValueError(f"deserialization not implemented for DAT {dat!r}")
 
 
-def decode_asset(var: dict[str, Any]):
-    def _smart_decode_trigger_kwargs(d):
-        """
-        Slightly clean up kwargs for display.
+def smart_decode_trigger_kwargs(d):
+    """
+    Slightly clean up kwargs for display or execution.
 
-        This detects one level of BaseSerialization and tries to deserialize 
the
-        content, removing some __type __var ugliness when the value is 
displayed
-        in UI to the user.
-        """
-        if not isinstance(d, dict) or Encoding.TYPE not in d:
-            return d
-        return BaseSerialization.deserialize(d)
+    This detects one level of BaseSerialization and tries to deserialize the
+    content, removing some __type __var ugliness when the value is displayed
+    in UI to the user and/or while execution.
+    """
+    if not isinstance(d, dict) or Encoding.TYPE not in d:
+        return d
+    return BaseSerialization.deserialize(d)
 
+
+def decode_asset(var: dict[str, Any]):
     watchers = var.get("watchers", [])
     return Asset(
         name=var["name"],
@@ -361,7 +362,7 @@ def decode_asset(var: dict[str, Any]):
                 name=watcher["name"],
                 trigger={
                     "classpath": watcher["trigger"]["classpath"],
-                    "kwargs": 
_smart_decode_trigger_kwargs(watcher["trigger"]["kwargs"]),
+                    "kwargs": 
smart_decode_trigger_kwargs(watcher["trigger"]["kwargs"]),
                 },
             )
             for watcher in watchers
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 08e08f08c4f..dd1f2b174c8 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -326,6 +326,51 @@ class TestTriggerRunner:
         assert trigger_id == 1
         assert traceback[-1] == "ModuleNotFoundError: No module named 'fake'\n"
 
+    @pytest.mark.asyncio
+    async def test_trigger_kwargs_serialization_cleanup(self, session):
+        """
+        Test that trigger kwargs are properly cleaned of serialization 
artifacts
+        (__var, __type keys).
+        """
+        from airflow.serialization.serialized_objects import BaseSerialization
+
+        kw = {"simple": "test", "tuple": (), "dict": {}, "list": []}
+
+        serialized_kwargs = BaseSerialization.serialize(kw)
+
+        trigger_orm = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", 
kwargs=serialized_kwargs)
+        session.add(trigger_orm)
+        session.commit()
+
+        stored_kwargs = trigger_orm.kwargs
+        assert stored_kwargs == {
+            "Encoding.TYPE": "dict",
+            "Encoding.VAR": {
+                "dict": {"Encoding.TYPE": "dict", "Encoding.VAR": {}},
+                "list": [],
+                "simple": "test",
+                "tuple": {"Encoding.TYPE": "tuple", "Encoding.VAR": []},
+            },
+        }
+
+        runner = TriggerRunner()
+        runner.to_create.append(
+            workloads.RunTrigger.model_construct(
+                id=trigger_orm.id,
+                ti=None,
+                classpath=trigger_orm.classpath,
+                encrypted_kwargs=trigger_orm.encrypted_kwargs,
+            )
+        )
+
+        await runner.create_triggers()
+        assert trigger_orm.id in runner.triggers
+        trigger_instance = runner.triggers[trigger_orm.id]["task"]
+
+        # The test passes if no exceptions were raised during trigger creation
+        trigger_instance.cancel()
+        await runner.cleanup_finished_triggers()
+
 
 @pytest.mark.asyncio
 async def test_trigger_create_race_condition_38599(session, 
supervisor_builder):

Reply via email to