This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e9bdb245e0 Remove dead AIP-44 trigger-over-BaseSerialization path 
(DAT.BASE_TRIGGER) (#68528)
7e9bdb245e0 is described below

commit 7e9bdb245e09373d91fcb7bda575ad4be63aa28d
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Jun 16 22:54:00 2026 -0400

    Remove dead AIP-44 trigger-over-BaseSerialization path (DAT.BASE_TRIGGER) 
(#68528)
    
    The DAT.BASE_TRIGGER encode/decode (serializing a BaseTrigger instance via 
BaseSerialization) is a vestige of AIP-44's Internal API. Live deferral uses 
the structured DeferTask/TIDeferredStatePayload (classpath+kwargs); the 
execution API stores the classpath opaquely; the triggerer imports it from the 
DB row. No external producer/consumer of DAT.BASE_TRIGGER remains. Keeps 
generic AirflowException serialization (live).
    
    Generated-by: Claude Opus 4.8 (1M context)
---
 airflow-core/src/airflow/serialization/enums.py    |  1 -
 .../airflow/serialization/serialized_objects.py    | 17 ++----------
 .../unit/serialization/test_serialized_objects.py  | 32 +---------------------
 3 files changed, 3 insertions(+), 47 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/enums.py 
b/airflow-core/src/airflow/serialization/enums.py
index ae4c1249cab..a1f16ac9bfb 100644
--- a/airflow-core/src/airflow/serialization/enums.py
+++ b/airflow-core/src/airflow/serialization/enums.py
@@ -70,7 +70,6 @@ class DagAttributeTypes(str, Enum):
     TIMEDELTA = "timedelta"
     TIMEZONE = "timezone"
     RELATIVEDELTA = "relativedelta"
-    BASE_TRIGGER = "base_trigger"
     AIRFLOW_EXC_SER = "airflow_exc_ser"
     BASE_EXC_SER = "base_exc_ser"
     DICT = "dict"
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index d6d45bdcd97..14bbd343359 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -106,7 +106,7 @@ from airflow.task.priority_strategy import (
     validate_and_load_priority_weight_strategy,
 )
 from airflow.timetables.base import DagRunInfo, Timetable
-from airflow.triggers.base import BaseTrigger, StartTriggerArgs
+from airflow.triggers.base import StartTriggerArgs
 from airflow.utils.code_utils import get_python_source
 from airflow.utils.db import LazySelectSequence
 
@@ -470,7 +470,6 @@ class BaseSerialization:
         :meta private:
         """
         from airflow.sdk.definitions._internal.types import is_arg_set
-        from airflow.sdk.exceptions import TaskDeferred
 
         if not is_arg_set(var):
             return cls._encode(None, type_=DAT.ARG_NOT_SET)
@@ -535,7 +534,7 @@ class BaseSerialization:
                 var._asdict(),
                 type_=DAT.TASK_INSTANCE_KEY,
             )
-        elif isinstance(var, (AirflowException, TaskDeferred)) and 
hasattr(var, "serialize"):
+        elif isinstance(var, AirflowException) and hasattr(var, "serialize"):
             exc_cls_name, args, kwargs = var.serialize()
             return cls._encode(
                 cls.serialize(
@@ -556,14 +555,6 @@ class BaseSerialization:
                 ),
                 type_=DAT.BASE_EXC_SER,
             )
-        elif isinstance(var, BaseTrigger):
-            return cls._encode(
-                cls.serialize(
-                    var.serialize(),
-                    strict=strict,
-                ),
-                type_=DAT.BASE_TRIGGER,
-            )
         elif callable(var):
             return str(get_python_source(var))
         elif isinstance(var, set):
@@ -672,10 +663,6 @@ class BaseSerialization:
             else:
                 exc_cls = import_string(f"builtins.{exc_cls_name}")
             return exc_cls(*args, **kwargs)
-        elif type_ == DAT.BASE_TRIGGER:
-            tr_cls_name, kwargs = cls.deserialize(var)
-            tr_cls = import_string(tr_cls_name)
-            return tr_cls(**kwargs)
         elif type_ == DAT.SET:
             return {cls.deserialize(v) for v in var}
         elif type_ == DAT.TUPLE:
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 58cb1f77900..2d2119095d3 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -39,7 +39,6 @@ from airflow.exceptions import (
     AirflowFailException,
     AirflowRescheduleException,
     SerializationError,
-    TaskDeferred,
 )
 from airflow.models.connection import Connection
 from airflow.models.dag import DAG
@@ -104,7 +103,6 @@ from airflow.serialization.serialized_objects import (
     LazyDeserializedDAG,
     _has_kubernetes,
 )
-from airflow.triggers.base import BaseTrigger
 from airflow.utils.db import LazySelectSequence
 
 from unit.models import DEFAULT_DATE
@@ -570,42 +568,14 @@ def test_ser_of_asset_event_accessor():
     assert d[Asset(name="yo", uri="test://yo")].extra == {"this": "that", 
"the": "other"}
 
 
-class MyTrigger(BaseTrigger):
-    def __init__(self, hi):
-        self.hi = hi
-
-    def serialize(self):
-        return "unit.serialization.test_serialized_objects.MyTrigger", {"hi": 
self.hi}
-
-    async def run(self):
-        yield
-
-
 def test_roundtrip_exceptions():
-    """
-    This is for AIP-44 when we need to send certain non-error exceptions
-    as part of an RPC call e.g. TaskDeferred or AirflowRescheduleException.
-    """
+    """Non-error AirflowExceptions (e.g. AirflowRescheduleException) 
round-trip through BaseSerialization."""
     some_date = pendulum.now()
     resched_exc = AirflowRescheduleException(reschedule_date=some_date)
     ser = BaseSerialization.serialize(resched_exc)
     deser = BaseSerialization.deserialize(ser)
     assert isinstance(deser, AirflowRescheduleException)
     assert deser.reschedule_date == some_date
-    del ser
-    del deser
-    exc = TaskDeferred(
-        trigger=MyTrigger(hi="yo"),
-        method_name="meth_name",
-        kwargs={"have": "pie"},
-        timeout=timedelta(seconds=30),
-    )
-    ser = BaseSerialization.serialize(exc)
-    deser = BaseSerialization.deserialize(ser)
-    assert deser.trigger.hi == "yo"
-    assert deser.method_name == "meth_name"
-    assert deser.kwargs == {"have": "pie"}
-    assert deser.timeout == timedelta(seconds=30)
 
 
 @pytest.mark.parametrize(

Reply via email to