This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6b0ccd38417 Register custom deadline references via plugins, matching
timetable pattern (#66737)
6b0ccd38417 is described below
commit 6b0ccd384171fa9d662b13d429f180d78487b538
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon May 18 11:32:21 2026 +0200
Register custom deadline references via plugins, matching timetable pattern
(#66737)
* Register custom deadline references via plugins, matching timetable
pattern
Align custom deadline reference resolution with the existing
timetable / partition-mapper plugin pattern. Adds a new
`deadline_references` attribute to `AirflowPlugin` and a matching
`find_registered_custom_deadline_reference` helper.
`SerializedCustomReference.deserialize_reference()` now resolves
class paths through the registry instead of `import_string()` on
the serialized payload.
Generated-by: Claude Opus 4.7 (1M context) following the guidelines at
https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions
* Add newsfragment for deadline-references plugin migration
Generated-by: Claude Opus 4.7 (1M context) following the guidelines at
https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions
---
.../docs/administration-and-deployment/plugins.rst | 7 ++
airflow-core/newsfragments/66737.significant.rst | 1 +
airflow-core/src/airflow/plugins_manager.py | 13 +++
.../airflow/serialization/definitions/deadline.py | 4 +-
airflow-core/src/airflow/serialization/helpers.py | 27 ++++++
.../test_deadline_reference_registry.py | 100 +++++++++++++++++++++
.../plugins_manager/plugins_manager.py | 3 +
7 files changed, 153 insertions(+), 2 deletions(-)
diff --git a/airflow-core/docs/administration-and-deployment/plugins.rst
b/airflow-core/docs/administration-and-deployment/plugins.rst
index dad0f9991e3..4f616157bcc 100644
--- a/airflow-core/docs/administration-and-deployment/plugins.rst
+++ b/airflow-core/docs/administration-and-deployment/plugins.rst
@@ -156,6 +156,13 @@ looks like:
# A list of timetable classes to register so they can be used in Dags.
timetables = []
+ # A list of deadline reference classes that can be used as custom
deadlines in Dags.
+ # Custom deadline reference classes must be registered here in order
to be
+ # resolvable at scheduler-side deserialization time; classes that are
not
+ # registered will raise ``DeadlineReferenceNotRegistered`` when a Dag
attempts
+ # to use them.
+ deadline_references = []
+
# A list of Listeners that plugin provides. Listeners can register to
# listen to particular events that happen in Airflow, like
# TaskInstance state changes. Listeners are python modules.
diff --git a/airflow-core/newsfragments/66737.significant.rst
b/airflow-core/newsfragments/66737.significant.rst
new file mode 100644
index 00000000000..18f0e39331a
--- /dev/null
+++ b/airflow-core/newsfragments/66737.significant.rst
@@ -0,0 +1 @@
+Custom deadline reference classes must now be registered via the new
``deadline_references`` attribute on ``AirflowPlugin``, matching the existing
pattern for custom timetables and custom partition mappers. To use a custom
``DeadlineReference`` subclass, register it in a plugin's
``deadline_references`` list. Custom references that are not registered will
raise ``DeadlineReferenceNotRegistered`` at deserialization.
diff --git a/airflow-core/src/airflow/plugins_manager.py
b/airflow-core/src/airflow/plugins_manager.py
index 9b0487a8985..c24b4bbd721 100644
--- a/airflow-core/src/airflow/plugins_manager.py
+++ b/airflow-core/src/airflow/plugins_manager.py
@@ -39,6 +39,7 @@ from airflow.configuration import conf
if TYPE_CHECKING:
from airflow.listeners.listener import ListenerManager
+ from airflow.models.deadline import DeadlineReferenceType
from airflow.partition_mappers.base import PartitionMapper
from airflow.task.priority_strategy import PriorityWeightStrategy
from airflow.timetables.base import Timetable
@@ -286,6 +287,18 @@ def get_partition_mapper_plugins() -> dict[str,
type[PartitionMapper]]:
}
+@cache
+def get_deadline_references_plugins() -> dict[str,
type[DeadlineReferenceType]]:
+ """Collect and get deadline reference classes registered by plugins."""
+ log.debug("Initialize extra deadline reference plugins")
+
+ return {
+ qualname(deadline_ref_cls): deadline_ref_cls
+ for plugin in _get_plugins()[0]
+ for deadline_ref_cls in plugin.deadline_references
+ }
+
+
@cache
def integrate_macros_plugins() -> None:
"""Integrates macro plugins."""
diff --git a/airflow-core/src/airflow/serialization/definitions/deadline.py
b/airflow-core/src/airflow/serialization/definitions/deadline.py
index 0be0c849107..58eaa46e6f7 100644
--- a/airflow-core/src/airflow/serialization/definitions/deadline.py
+++ b/airflow-core/src/airflow/serialization/definitions/deadline.py
@@ -307,9 +307,9 @@ class SerializedReferenceModels:
@classmethod
def deserialize_reference(cls, reference_data: dict):
- from airflow._shared.module_loading import import_string
+ from airflow.serialization.helpers import
find_registered_custom_deadline_reference
- custom_class = import_string(reference_data["__class_path"])
+ custom_class =
find_registered_custom_deadline_reference(reference_data["__class_path"])
inner_ref = custom_class.deserialize_reference(reference_data)
return cls(inner_ref)
diff --git a/airflow-core/src/airflow/serialization/helpers.py
b/airflow-core/src/airflow/serialization/helpers.py
index 83b57d1c7cc..213b1991d6d 100644
--- a/airflow-core/src/airflow/serialization/helpers.py
+++ b/airflow-core/src/airflow/serialization/helpers.py
@@ -28,6 +28,7 @@ from airflow._shared.template_rendering import
truncate_rendered_value
from airflow.configuration import conf
if TYPE_CHECKING:
+ from airflow.models.deadline import DeadlineReferenceType
from airflow.partition_mappers.base import PartitionMapper
from airflow.timetables.base import Timetable as CoreTimetable
@@ -145,6 +146,32 @@ def
find_registered_custom_partition_mapper(importable_string: str) -> type[Part
raise PartitionMapperNotFound(importable_string)
+class DeadlineReferenceNotRegistered(ValueError):
+ """When an unregistered custom deadline reference is being accessed."""
+
+ def __init__(self, type_string: str) -> None:
+ self.type_string = type_string
+
+ def __str__(self) -> str:
+ return (
+ f"Custom deadline reference class {self.type_string!r} is not "
+ "registered. Custom deadline references must be registered via the
"
+ "`deadline_references` attribute on an AirflowPlugin."
+ )
+
+
+def find_registered_custom_deadline_reference(
+ importable_string: str,
+) -> type[DeadlineReferenceType]:
+ """Find a user-defined custom deadline reference class registered via a
plugin."""
+ from airflow import plugins_manager
+
+ deadline_ref_classes = plugins_manager.get_deadline_references_plugins()
+ with contextlib.suppress(KeyError):
+ return deadline_ref_classes[importable_string]
+ raise DeadlineReferenceNotRegistered(importable_string)
+
+
def is_core_timetable_import_path(importable_string: str) -> bool:
"""Whether an importable string points to a core timetable class."""
return importable_string.startswith("airflow.timetables.")
diff --git
a/airflow-core/tests/unit/serialization/test_deadline_reference_registry.py
b/airflow-core/tests/unit/serialization/test_deadline_reference_registry.py
new file mode 100644
index 00000000000..feae74bf980
--- /dev/null
+++ b/airflow-core/tests/unit/serialization/test_deadline_reference_registry.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow import plugins_manager
+from airflow.models.deadline import ReferenceModels
+from airflow.serialization.definitions.deadline import
SerializedReferenceModels
+from airflow.serialization.helpers import (
+ DeadlineReferenceNotRegistered,
+ find_registered_custom_deadline_reference,
+)
+
+
+class _RegisteredCustomReference(ReferenceModels.BaseDeadlineReference):
+ """Fake deadline reference registered through a plugin in these tests."""
+
+ required_kwargs: set[str] = set()
+
+ @classmethod
+ def deserialize_reference(cls, reference_data: dict):
+ return cls()
+
+ def serialize_reference(self) -> dict:
+ return {}
+
+ def _evaluate_with(self, *, session, **kwargs):
+ return None
+
+
+_IMPORTABLE =
f"{_RegisteredCustomReference.__module__}._RegisteredCustomReference"
+
+
[email protected]
+def fake_plugin_registry(monkeypatch):
+ """Stub `get_deadline_references_plugins` to advertise a single registered
class."""
+ registered = {_IMPORTABLE: _RegisteredCustomReference}
+ monkeypatch.setattr(
+ plugins_manager,
+ "get_deadline_references_plugins",
+ lambda: registered,
+ )
+ return registered
+
+
+def test_find_registered_returns_class(fake_plugin_registry):
+ assert find_registered_custom_deadline_reference(_IMPORTABLE) is
_RegisteredCustomReference
+
+
+def test_find_registered_raises_for_unknown(fake_plugin_registry):
+ with pytest.raises(DeadlineReferenceNotRegistered) as exc_info:
+
find_registered_custom_deadline_reference("not.registered.SomeReference")
+ assert exc_info.value.type_string == "not.registered.SomeReference"
+ assert "not.registered.SomeReference" in str(exc_info.value)
+
+
+def test_find_registered_raises_when_registry_empty(monkeypatch):
+ monkeypatch.setattr(
+ plugins_manager,
+ "get_deadline_references_plugins",
+ lambda: {},
+ )
+ with pytest.raises(DeadlineReferenceNotRegistered):
+
find_registered_custom_deadline_reference("anything.at.all.MyReference")
+
+
+def test_serialized_custom_reference_uses_registry(fake_plugin_registry):
+ result =
SerializedReferenceModels.SerializedCustomReference.deserialize_reference(
+ {"__class_path": _IMPORTABLE}
+ )
+
+ assert isinstance(result,
SerializedReferenceModels.SerializedCustomReference)
+ assert isinstance(result.inner_ref, _RegisteredCustomReference)
+
+
+def test_serialized_custom_reference_rejects_unregistered(monkeypatch):
+ monkeypatch.setattr(
+ plugins_manager,
+ "get_deadline_references_plugins",
+ lambda: {},
+ )
+ with pytest.raises(DeadlineReferenceNotRegistered):
+
SerializedReferenceModels.SerializedCustomReference.deserialize_reference(
+ {"__class_path": "some.other.module.UnregisteredReference"}
+ )
diff --git
a/shared/plugins_manager/src/airflow_shared/plugins_manager/plugins_manager.py
b/shared/plugins_manager/src/airflow_shared/plugins_manager/plugins_manager.py
index 8fcc5c9c808..1632ccd7ac9 100644
---
a/shared/plugins_manager/src/airflow_shared/plugins_manager/plugins_manager.py
+++
b/shared/plugins_manager/src/airflow_shared/plugins_manager/plugins_manager.py
@@ -122,6 +122,9 @@ class AirflowPlugin:
# A list of timetable classes that can be used for Dag scheduling.
partition_mappers: list[Any] = []
+ # A list of deadline reference classes that can be used as custom
deadlines in Dags.
+ deadline_references: list[Any] = []
+
# A list of listeners that can be used for tracking task and Dag states.
listeners: list[ModuleType | object] = []