This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b0ccd38417 Register custom deadline references via plugins, matching 
timetable pattern (#66737)
6b0ccd38417 is described below

commit 6b0ccd384171fa9d662b13d429f180d78487b538
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon May 18 11:32:21 2026 +0200

    Register custom deadline references via plugins, matching timetable pattern 
(#66737)
    
    * Register custom deadline references via plugins, matching timetable 
pattern
    
    Align custom deadline reference resolution with the existing
    timetable / partition-mapper plugin pattern. Adds a new
    `deadline_references` attribute to `AirflowPlugin` and a matching
    `find_registered_custom_deadline_reference` helper.
    `SerializedCustomReference.deserialize_reference()` now resolves
    class paths through the registry instead of `import_string()` on
    the serialized payload.
    
    Generated-by: Claude Opus 4.7 (1M context) following the guidelines at
    
https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions
    
    * Add newsfragment for deadline-references plugin migration
    
    Generated-by: Claude Opus 4.7 (1M context) following the guidelines at
    
https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions
---
 .../docs/administration-and-deployment/plugins.rst |   7 ++
 airflow-core/newsfragments/66737.significant.rst   |   1 +
 airflow-core/src/airflow/plugins_manager.py        |  13 +++
 .../airflow/serialization/definitions/deadline.py  |   4 +-
 airflow-core/src/airflow/serialization/helpers.py  |  27 ++++++
 .../test_deadline_reference_registry.py            | 100 +++++++++++++++++++++
 .../plugins_manager/plugins_manager.py             |   3 +
 7 files changed, 153 insertions(+), 2 deletions(-)

diff --git a/airflow-core/docs/administration-and-deployment/plugins.rst 
b/airflow-core/docs/administration-and-deployment/plugins.rst
index dad0f9991e3..4f616157bcc 100644
--- a/airflow-core/docs/administration-and-deployment/plugins.rst
+++ b/airflow-core/docs/administration-and-deployment/plugins.rst
@@ -156,6 +156,13 @@ looks like:
         # A list of timetable classes to register so they can be used in Dags.
         timetables = []
 
+        # A list of deadline reference classes that can be used as custom 
deadlines in Dags.
+        # Custom deadline reference classes must be registered here in order 
to be
+        # resolvable at scheduler-side deserialization time; classes that are 
not
+        # registered will raise ``DeadlineReferenceNotRegistered`` when a Dag 
attempts
+        # to use them.
+        deadline_references = []
+
         # A list of Listeners that plugin provides. Listeners can register to
         # listen to particular events that happen in Airflow, like
         # TaskInstance state changes. Listeners are python modules.
diff --git a/airflow-core/newsfragments/66737.significant.rst 
b/airflow-core/newsfragments/66737.significant.rst
new file mode 100644
index 00000000000..18f0e39331a
--- /dev/null
+++ b/airflow-core/newsfragments/66737.significant.rst
@@ -0,0 +1 @@
+Custom deadline reference classes must now be registered via the new 
``deadline_references`` attribute on ``AirflowPlugin``, matching the existing 
pattern for custom timetables and custom partition mappers. To use a custom 
``DeadlineReference`` subclass, register it in a plugin's 
``deadline_references`` list. Custom references that are not registered will 
raise ``DeadlineReferenceNotRegistered`` at deserialization.
diff --git a/airflow-core/src/airflow/plugins_manager.py 
b/airflow-core/src/airflow/plugins_manager.py
index 9b0487a8985..c24b4bbd721 100644
--- a/airflow-core/src/airflow/plugins_manager.py
+++ b/airflow-core/src/airflow/plugins_manager.py
@@ -39,6 +39,7 @@ from airflow.configuration import conf
 
 if TYPE_CHECKING:
     from airflow.listeners.listener import ListenerManager
+    from airflow.models.deadline import DeadlineReferenceType
     from airflow.partition_mappers.base import PartitionMapper
     from airflow.task.priority_strategy import PriorityWeightStrategy
     from airflow.timetables.base import Timetable
@@ -286,6 +287,18 @@ def get_partition_mapper_plugins() -> dict[str, 
type[PartitionMapper]]:
     }
 
 
+@cache
+def get_deadline_references_plugins() -> dict[str, 
type[DeadlineReferenceType]]:
+    """Collect and get deadline reference classes registered by plugins."""
+    log.debug("Initialize extra deadline reference plugins")
+
+    return {
+        qualname(deadline_ref_cls): deadline_ref_cls
+        for plugin in _get_plugins()[0]
+        for deadline_ref_cls in plugin.deadline_references
+    }
+
+
 @cache
 def integrate_macros_plugins() -> None:
     """Integrates macro plugins."""
diff --git a/airflow-core/src/airflow/serialization/definitions/deadline.py 
b/airflow-core/src/airflow/serialization/definitions/deadline.py
index 0be0c849107..58eaa46e6f7 100644
--- a/airflow-core/src/airflow/serialization/definitions/deadline.py
+++ b/airflow-core/src/airflow/serialization/definitions/deadline.py
@@ -307,9 +307,9 @@ class SerializedReferenceModels:
 
         @classmethod
         def deserialize_reference(cls, reference_data: dict):
-            from airflow._shared.module_loading import import_string
+            from airflow.serialization.helpers import 
find_registered_custom_deadline_reference
 
-            custom_class = import_string(reference_data["__class_path"])
+            custom_class = 
find_registered_custom_deadline_reference(reference_data["__class_path"])
             inner_ref = custom_class.deserialize_reference(reference_data)
             return cls(inner_ref)
 
diff --git a/airflow-core/src/airflow/serialization/helpers.py 
b/airflow-core/src/airflow/serialization/helpers.py
index 83b57d1c7cc..213b1991d6d 100644
--- a/airflow-core/src/airflow/serialization/helpers.py
+++ b/airflow-core/src/airflow/serialization/helpers.py
@@ -28,6 +28,7 @@ from airflow._shared.template_rendering import 
truncate_rendered_value
 from airflow.configuration import conf
 
 if TYPE_CHECKING:
+    from airflow.models.deadline import DeadlineReferenceType
     from airflow.partition_mappers.base import PartitionMapper
     from airflow.timetables.base import Timetable as CoreTimetable
 
@@ -145,6 +146,32 @@ def 
find_registered_custom_partition_mapper(importable_string: str) -> type[Part
     raise PartitionMapperNotFound(importable_string)
 
 
+class DeadlineReferenceNotRegistered(ValueError):
+    """When an unregistered custom deadline reference is being accessed."""
+
+    def __init__(self, type_string: str) -> None:
+        self.type_string = type_string
+
+    def __str__(self) -> str:
+        return (
+            f"Custom deadline reference class {self.type_string!r} is not "
+            "registered. Custom deadline references must be registered via the 
"
+            "`deadline_references` attribute on an AirflowPlugin."
+        )
+
+
+def find_registered_custom_deadline_reference(
+    importable_string: str,
+) -> type[DeadlineReferenceType]:
+    """Find a user-defined custom deadline reference class registered via a 
plugin."""
+    from airflow import plugins_manager
+
+    deadline_ref_classes = plugins_manager.get_deadline_references_plugins()
+    with contextlib.suppress(KeyError):
+        return deadline_ref_classes[importable_string]
+    raise DeadlineReferenceNotRegistered(importable_string)
+
+
 def is_core_timetable_import_path(importable_string: str) -> bool:
     """Whether an importable string points to a core timetable class."""
     return importable_string.startswith("airflow.timetables.")
diff --git 
a/airflow-core/tests/unit/serialization/test_deadline_reference_registry.py 
b/airflow-core/tests/unit/serialization/test_deadline_reference_registry.py
new file mode 100644
index 00000000000..feae74bf980
--- /dev/null
+++ b/airflow-core/tests/unit/serialization/test_deadline_reference_registry.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow import plugins_manager
+from airflow.models.deadline import ReferenceModels
+from airflow.serialization.definitions.deadline import 
SerializedReferenceModels
+from airflow.serialization.helpers import (
+    DeadlineReferenceNotRegistered,
+    find_registered_custom_deadline_reference,
+)
+
+
+class _RegisteredCustomReference(ReferenceModels.BaseDeadlineReference):
+    """Fake deadline reference registered through a plugin in these tests."""
+
+    required_kwargs: set[str] = set()
+
+    @classmethod
+    def deserialize_reference(cls, reference_data: dict):
+        return cls()
+
+    def serialize_reference(self) -> dict:
+        return {}
+
+    def _evaluate_with(self, *, session, **kwargs):
+        return None
+
+
+_IMPORTABLE = 
f"{_RegisteredCustomReference.__module__}._RegisteredCustomReference"
+
+
[email protected]
+def fake_plugin_registry(monkeypatch):
+    """Stub `get_deadline_references_plugins` to advertise a single registered 
class."""
+    registered = {_IMPORTABLE: _RegisteredCustomReference}
+    monkeypatch.setattr(
+        plugins_manager,
+        "get_deadline_references_plugins",
+        lambda: registered,
+    )
+    return registered
+
+
+def test_find_registered_returns_class(fake_plugin_registry):
+    assert find_registered_custom_deadline_reference(_IMPORTABLE) is 
_RegisteredCustomReference
+
+
+def test_find_registered_raises_for_unknown(fake_plugin_registry):
+    with pytest.raises(DeadlineReferenceNotRegistered) as exc_info:
+        
find_registered_custom_deadline_reference("not.registered.SomeReference")
+    assert exc_info.value.type_string == "not.registered.SomeReference"
+    assert "not.registered.SomeReference" in str(exc_info.value)
+
+
+def test_find_registered_raises_when_registry_empty(monkeypatch):
+    monkeypatch.setattr(
+        plugins_manager,
+        "get_deadline_references_plugins",
+        lambda: {},
+    )
+    with pytest.raises(DeadlineReferenceNotRegistered):
+        
find_registered_custom_deadline_reference("anything.at.all.MyReference")
+
+
+def test_serialized_custom_reference_uses_registry(fake_plugin_registry):
+    result = 
SerializedReferenceModels.SerializedCustomReference.deserialize_reference(
+        {"__class_path": _IMPORTABLE}
+    )
+
+    assert isinstance(result, 
SerializedReferenceModels.SerializedCustomReference)
+    assert isinstance(result.inner_ref, _RegisteredCustomReference)
+
+
+def test_serialized_custom_reference_rejects_unregistered(monkeypatch):
+    monkeypatch.setattr(
+        plugins_manager,
+        "get_deadline_references_plugins",
+        lambda: {},
+    )
+    with pytest.raises(DeadlineReferenceNotRegistered):
+        
SerializedReferenceModels.SerializedCustomReference.deserialize_reference(
+            {"__class_path": "some.other.module.UnregisteredReference"}
+        )
diff --git 
a/shared/plugins_manager/src/airflow_shared/plugins_manager/plugins_manager.py 
b/shared/plugins_manager/src/airflow_shared/plugins_manager/plugins_manager.py
index 8fcc5c9c808..1632ccd7ac9 100644
--- 
a/shared/plugins_manager/src/airflow_shared/plugins_manager/plugins_manager.py
+++ 
b/shared/plugins_manager/src/airflow_shared/plugins_manager/plugins_manager.py
@@ -122,6 +122,9 @@ class AirflowPlugin:
     # A list of timetable classes that can be used for Dag scheduling.
     partition_mappers: list[Any] = []
 
+    # A list of deadline reference classes that can be used as custom 
deadlines in Dags.
+    deadline_references: list[Any] = []
+
     # A list of listeners that can be used for tracking task and Dag states.
     listeners: list[ModuleType | object] = []
 

Reply via email to