This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7e9bdb245e0 Remove dead AIP-44 trigger-over-BaseSerialization path
(DAT.BASE_TRIGGER) (#68528)
7e9bdb245e0 is described below
commit 7e9bdb245e09373d91fcb7bda575ad4be63aa28d
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Jun 16 22:54:00 2026 -0400
Remove dead AIP-44 trigger-over-BaseSerialization path (DAT.BASE_TRIGGER)
(#68528)
The DAT.BASE_TRIGGER encode/decode (serializing a BaseTrigger instance via
BaseSerialization) is a vestige of AIP-44's Internal API. Live deferral uses
the structured DeferTask/TIDeferredStatePayload (classpath+kwargs); the
execution API stores the classpath opaquely; the triggerer imports it from the
DB row. No external producer/consumer of DAT.BASE_TRIGGER remains. Keeps
generic AirflowException serialization (live).
Generated-by: Claude Opus 4.8 (1M context)
---
airflow-core/src/airflow/serialization/enums.py | 1 -
.../airflow/serialization/serialized_objects.py | 17 ++----------
.../unit/serialization/test_serialized_objects.py | 32 +---------------------
3 files changed, 3 insertions(+), 47 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/enums.py
b/airflow-core/src/airflow/serialization/enums.py
index ae4c1249cab..a1f16ac9bfb 100644
--- a/airflow-core/src/airflow/serialization/enums.py
+++ b/airflow-core/src/airflow/serialization/enums.py
@@ -70,7 +70,6 @@ class DagAttributeTypes(str, Enum):
TIMEDELTA = "timedelta"
TIMEZONE = "timezone"
RELATIVEDELTA = "relativedelta"
- BASE_TRIGGER = "base_trigger"
AIRFLOW_EXC_SER = "airflow_exc_ser"
BASE_EXC_SER = "base_exc_ser"
DICT = "dict"
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index d6d45bdcd97..14bbd343359 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -106,7 +106,7 @@ from airflow.task.priority_strategy import (
validate_and_load_priority_weight_strategy,
)
from airflow.timetables.base import DagRunInfo, Timetable
-from airflow.triggers.base import BaseTrigger, StartTriggerArgs
+from airflow.triggers.base import StartTriggerArgs
from airflow.utils.code_utils import get_python_source
from airflow.utils.db import LazySelectSequence
@@ -470,7 +470,6 @@ class BaseSerialization:
:meta private:
"""
from airflow.sdk.definitions._internal.types import is_arg_set
- from airflow.sdk.exceptions import TaskDeferred
if not is_arg_set(var):
return cls._encode(None, type_=DAT.ARG_NOT_SET)
@@ -535,7 +534,7 @@ class BaseSerialization:
var._asdict(),
type_=DAT.TASK_INSTANCE_KEY,
)
- elif isinstance(var, (AirflowException, TaskDeferred)) and
hasattr(var, "serialize"):
+ elif isinstance(var, AirflowException) and hasattr(var, "serialize"):
exc_cls_name, args, kwargs = var.serialize()
return cls._encode(
cls.serialize(
@@ -556,14 +555,6 @@ class BaseSerialization:
),
type_=DAT.BASE_EXC_SER,
)
- elif isinstance(var, BaseTrigger):
- return cls._encode(
- cls.serialize(
- var.serialize(),
- strict=strict,
- ),
- type_=DAT.BASE_TRIGGER,
- )
elif callable(var):
return str(get_python_source(var))
elif isinstance(var, set):
@@ -672,10 +663,6 @@ class BaseSerialization:
else:
exc_cls = import_string(f"builtins.{exc_cls_name}")
return exc_cls(*args, **kwargs)
- elif type_ == DAT.BASE_TRIGGER:
- tr_cls_name, kwargs = cls.deserialize(var)
- tr_cls = import_string(tr_cls_name)
- return tr_cls(**kwargs)
elif type_ == DAT.SET:
return {cls.deserialize(v) for v in var}
elif type_ == DAT.TUPLE:
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 58cb1f77900..2d2119095d3 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -39,7 +39,6 @@ from airflow.exceptions import (
AirflowFailException,
AirflowRescheduleException,
SerializationError,
- TaskDeferred,
)
from airflow.models.connection import Connection
from airflow.models.dag import DAG
@@ -104,7 +103,6 @@ from airflow.serialization.serialized_objects import (
LazyDeserializedDAG,
_has_kubernetes,
)
-from airflow.triggers.base import BaseTrigger
from airflow.utils.db import LazySelectSequence
from unit.models import DEFAULT_DATE
@@ -570,42 +568,14 @@ def test_ser_of_asset_event_accessor():
assert d[Asset(name="yo", uri="test://yo")].extra == {"this": "that",
"the": "other"}
-class MyTrigger(BaseTrigger):
- def __init__(self, hi):
- self.hi = hi
-
- def serialize(self):
- return "unit.serialization.test_serialized_objects.MyTrigger", {"hi":
self.hi}
-
- async def run(self):
- yield
-
-
def test_roundtrip_exceptions():
- """
- This is for AIP-44 when we need to send certain non-error exceptions
- as part of an RPC call e.g. TaskDeferred or AirflowRescheduleException.
- """
+ """Non-error AirflowExceptions (e.g. AirflowRescheduleException)
round-trip through BaseSerialization."""
some_date = pendulum.now()
resched_exc = AirflowRescheduleException(reschedule_date=some_date)
ser = BaseSerialization.serialize(resched_exc)
deser = BaseSerialization.deserialize(ser)
assert isinstance(deser, AirflowRescheduleException)
assert deser.reschedule_date == some_date
- del ser
- del deser
- exc = TaskDeferred(
- trigger=MyTrigger(hi="yo"),
- method_name="meth_name",
- kwargs={"have": "pie"},
- timeout=timedelta(seconds=30),
- )
- ser = BaseSerialization.serialize(exc)
- deser = BaseSerialization.deserialize(ser)
- assert deser.trigger.hi == "yo"
- assert deser.method_name == "meth_name"
- assert deser.kwargs == {"have": "pie"}
- assert deser.timeout == timedelta(seconds=30)
@pytest.mark.parametrize(