This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 84c9f4bf70 Add built-in Extrenal Link for ExternalTaskMarker operator 
(#23964)
84c9f4bf70 is described below

commit 84c9f4bf70cbc2f4ba19fdc5aa88791500d4daaa
Author: Andrey Anshin <[email protected]>
AuthorDate: Sun Jun 5 00:40:53 2022 +0300

    Add built-in Extrenal Link for ExternalTaskMarker operator (#23964)
---
 airflow/sensors/external_task.py            | 31 +++++++++++++++++++++++++----
 airflow/serialization/serialized_objects.py |  3 ++-
 tests/sensors/test_external_task_sensor.py  | 10 +++++++++-
 3 files changed, 38 insertions(+), 6 deletions(-)

diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 40c0a7a566..9bb074d47c 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -18,8 +18,10 @@
 
 import datetime
 import os
+import warnings
 from typing import TYPE_CHECKING, Any, Callable, Collection, FrozenSet, 
Iterable, Optional, Union
 
+import attr
 from sqlalchemy import func
 
 from airflow.exceptions import AirflowException
@@ -31,10 +33,10 @@ from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
-class ExternalTaskSensorLink(BaseOperatorLink):
+class ExternalDagLink(BaseOperatorLink):
     """
-    Operator link for ExternalTaskSensor. It allows users to access
-    DAG waited with ExternalTaskSensor.
+    Operator link for ExternalTaskSensor and ExternalTaskMarker.
+    It allows users to access DAG waited with ExternalTaskSensor or cleared by 
ExternalTaskMarker.
     """
 
     name = 'External DAG'
@@ -83,7 +85,7 @@ class ExternalTaskSensor(BaseSensorOperator):
     @property
     def operator_extra_links(self):
         """Return operator extra links"""
-        return [ExternalTaskSensorLink()]
+        return [ExternalDagLink()]
 
     def __init__(
         self,
@@ -287,6 +289,11 @@ class ExternalTaskMarker(EmptyOperator):
     # The _serialized_fields are lazily loaded when get_serialized_fields() 
method is called
     __serialized_fields: Optional[FrozenSet[str]] = None
 
+    @property
+    def operator_extra_links(self):
+        """Return operator extra links"""
+        return [ExternalDagLink()]
+
     def __init__(
         self,
         *,
@@ -318,3 +325,19 @@ class ExternalTaskMarker(EmptyOperator):
         if not cls.__serialized_fields:
             cls.__serialized_fields = 
frozenset(super().get_serialized_fields() | {"recursion_depth"})
         return cls.__serialized_fields
+
+
[email protected](auto_attribs=True)
+class ExternalTaskSensorLink(ExternalDagLink):
+    """
+    This external link is deprecated.
+    Please use :class:`airflow.sensors.external_task.ExternalDagLink`.
+    """
+
+    def __attrs_post_init__(self):
+        warnings.warn(
+            "This external link is deprecated. "
+            "Please use 
:class:`airflow.sensors.external_task.ExternalDagLink`.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 3e674b2f8d..85286688f5 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -70,8 +70,9 @@ log = logging.getLogger(__name__)
 
 _OPERATOR_EXTRA_LINKS: Set[str] = {
     "airflow.operators.trigger_dagrun.TriggerDagRunLink",
-    "airflow.sensors.external_task.ExternalTaskSensorLink",
+    "airflow.sensors.external_task.ExternalDagLink",
     # Deprecated names, so that existing serialized dags load straight away.
+    "airflow.sensors.external_task.ExternalTaskSensorLink",
     "airflow.operators.dagrun_operator.TriggerDagRunLink",
     "airflow.sensors.external_task_sensor.ExternalTaskSensorLink",
 }
diff --git a/tests/sensors/test_external_task_sensor.py 
b/tests/sensors/test_external_task_sensor.py
index 8725d76081..15c78083d4 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -27,7 +27,7 @@ from airflow.models import DagBag, DagRun, TaskInstance
 from airflow.models.dag import DAG
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskMarker, 
ExternalTaskSensor
+from airflow.sensors.external_task import ExternalTaskMarker, 
ExternalTaskSensor, ExternalTaskSensorLink
 from airflow.sensors.time_sensor import TimeSensor
 from airflow.serialization.serialized_objects import SerializedBaseOperator
 from airflow.utils.session import provide_session
@@ -977,3 +977,11 @@ def 
test_clear_overlapping_external_task_marker(dag_bag_head_tail, session):
         )
         == 30
     )
+
+
+class TestExternalTaskSensorLink:
+    def test_deprecation_warning(self):
+        with pytest.warns(DeprecationWarning) as warnings:
+            ExternalTaskSensorLink()
+            assert len(warnings) == 1
+            assert warnings[0].filename == __file__

Reply via email to