This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 84c9f4bf70 Add built-in Extrenal Link for ExternalTaskMarker operator
(#23964)
84c9f4bf70 is described below
commit 84c9f4bf70cbc2f4ba19fdc5aa88791500d4daaa
Author: Andrey Anshin <[email protected]>
AuthorDate: Sun Jun 5 00:40:53 2022 +0300
Add built-in Extrenal Link for ExternalTaskMarker operator (#23964)
---
airflow/sensors/external_task.py | 31 +++++++++++++++++++++++++----
airflow/serialization/serialized_objects.py | 3 ++-
tests/sensors/test_external_task_sensor.py | 10 +++++++++-
3 files changed, 38 insertions(+), 6 deletions(-)
diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 40c0a7a566..9bb074d47c 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -18,8 +18,10 @@
import datetime
import os
+import warnings
from typing import TYPE_CHECKING, Any, Callable, Collection, FrozenSet,
Iterable, Optional, Union
+import attr
from sqlalchemy import func
from airflow.exceptions import AirflowException
@@ -31,10 +33,10 @@ from airflow.utils.session import provide_session
from airflow.utils.state import State
-class ExternalTaskSensorLink(BaseOperatorLink):
+class ExternalDagLink(BaseOperatorLink):
"""
- Operator link for ExternalTaskSensor. It allows users to access
- DAG waited with ExternalTaskSensor.
+ Operator link for ExternalTaskSensor and ExternalTaskMarker.
+ It allows users to access DAG waited with ExternalTaskSensor or cleared by
ExternalTaskMarker.
"""
name = 'External DAG'
@@ -83,7 +85,7 @@ class ExternalTaskSensor(BaseSensorOperator):
@property
def operator_extra_links(self):
"""Return operator extra links"""
- return [ExternalTaskSensorLink()]
+ return [ExternalDagLink()]
def __init__(
self,
@@ -287,6 +289,11 @@ class ExternalTaskMarker(EmptyOperator):
# The _serialized_fields are lazily loaded when get_serialized_fields()
method is called
__serialized_fields: Optional[FrozenSet[str]] = None
+ @property
+ def operator_extra_links(self):
+ """Return operator extra links"""
+ return [ExternalDagLink()]
+
def __init__(
self,
*,
@@ -318,3 +325,19 @@ class ExternalTaskMarker(EmptyOperator):
if not cls.__serialized_fields:
cls.__serialized_fields =
frozenset(super().get_serialized_fields() | {"recursion_depth"})
return cls.__serialized_fields
+
+
[email protected](auto_attribs=True)
+class ExternalTaskSensorLink(ExternalDagLink):
+ """
+ This external link is deprecated.
+ Please use :class:`airflow.sensors.external_task.ExternalDagLink`.
+ """
+
+ def __attrs_post_init__(self):
+ warnings.warn(
+ "This external link is deprecated. "
+ "Please use
:class:`airflow.sensors.external_task.ExternalDagLink`.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 3e674b2f8d..85286688f5 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -70,8 +70,9 @@ log = logging.getLogger(__name__)
_OPERATOR_EXTRA_LINKS: Set[str] = {
"airflow.operators.trigger_dagrun.TriggerDagRunLink",
- "airflow.sensors.external_task.ExternalTaskSensorLink",
+ "airflow.sensors.external_task.ExternalDagLink",
# Deprecated names, so that existing serialized dags load straight away.
+ "airflow.sensors.external_task.ExternalTaskSensorLink",
"airflow.operators.dagrun_operator.TriggerDagRunLink",
"airflow.sensors.external_task_sensor.ExternalTaskSensorLink",
}
diff --git a/tests/sensors/test_external_task_sensor.py
b/tests/sensors/test_external_task_sensor.py
index 8725d76081..15c78083d4 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -27,7 +27,7 @@ from airflow.models import DagBag, DagRun, TaskInstance
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskMarker,
ExternalTaskSensor
+from airflow.sensors.external_task import ExternalTaskMarker,
ExternalTaskSensor, ExternalTaskSensorLink
from airflow.sensors.time_sensor import TimeSensor
from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils.session import provide_session
@@ -977,3 +977,11 @@ def
test_clear_overlapping_external_task_marker(dag_bag_head_tail, session):
)
== 30
)
+
+
+class TestExternalTaskSensorLink:
+ def test_deprecation_warning(self):
+ with pytest.warns(DeprecationWarning) as warnings:
+ ExternalTaskSensorLink()
+ assert len(warnings) == 1
+ assert warnings[0].filename == __file__