This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4148883ee3 Deprecate `DummyOperator` in favor of `EmptyOperator`
(#22832)
4148883ee3 is described below
commit 4148883ee31b55e368deb8da958e78d889abbe22
Author: eladkal <[email protected]>
AuthorDate: Wed Apr 13 09:47:56 2022 +0300
Deprecate `DummyOperator` in favor of `EmptyOperator` (#22832)
* Deprecate `DummyOperator` in favor of `EmptyOperator`
---
airflow/decorators/base.py | 2 +-
airflow/models/abstractoperator.py | 4 +-
airflow/models/baseoperator.py | 10 ++---
airflow/models/dagrun.py | 2 +-
airflow/models/mappedoperator.py | 10 ++---
airflow/operators/dummy.py | 36 +++++++++------
airflow/operators/{dummy.py => empty.py} | 9 ++--
airflow/serialization/serialized_objects.py | 11 +++--
dev/provider_packages/prepare_provider_packages.py | 10 +++++
.../dummy.py => tests/operators/test_dummy.py | 22 +++------
tests/serialization/test_dag_serialization.py | 52 +++++++++++++++++++---
11 files changed, 108 insertions(+), 60 deletions(-)
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 678e145eba..3d66d01c91 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -359,7 +359,7 @@ class _TaskDecorator(Generic[Function, OperatorSubclass]):
template_fields_renderers=self.operator_class.template_fields_renderers,
ui_color=self.operator_class.ui_color,
ui_fgcolor=self.operator_class.ui_fgcolor,
- is_dummy=False,
+ is_empty=False,
task_module=self.operator_class.__module__,
task_type=self.operator_class.__name__,
dag=dag,
diff --git a/airflow/models/abstractoperator.py
b/airflow/models/abstractoperator.py
index 51e0af4eb6..c64b8554af 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -107,7 +107,7 @@ class AbstractOperator(LoggingMixin, DAGNode):
'dag', # We show dag_id, don't need to show this too
'node_id', # Duplicates task_id
'task_group', # Doesn't have a useful repr, no point showing in UI
- 'inherits_from_dummy_operator', # impl detail
+ 'inherits_from_empty_operator', # impl detail
# For compatibility with TG, for operators these are just the
current task, no point showing
'roots',
'leaves',
@@ -128,7 +128,7 @@ class AbstractOperator(LoggingMixin, DAGNode):
raise NotImplementedError()
@property
- def inherits_from_dummy_operator(self) -> bool:
+ def inherits_from_empty_operator(self) -> bool:
raise NotImplementedError()
@property
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 4433e9aca0..4b3ccdde8b 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1459,12 +1459,12 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
is_mapped: ClassVar[bool] = False
@property
- def inherits_from_dummy_operator(self):
- """Used to determine if an Operator is inherited from DummyOperator"""
- # This looks like `isinstance(self, DummyOperator) would work, but
this also
- # needs to cope when `self` is a Serialized instance of a
DummyOperator or one
+ def inherits_from_empty_operator(self):
+ """Used to determine if an Operator is inherited from EmptyOperator"""
+ # This looks like `isinstance(self, EmptyOperator) would work, but
this also
+ # needs to cope when `self` is a Serialized instance of a
EmptyOperator or one
# of its sub-classes (which don't inherit from anything but
BaseOperator).
- return getattr(self, '_is_dummy', False)
+ return getattr(self, '_is_empty', False)
def defer(
self,
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index b5d91e74b1..74cbce5399 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1016,7 +1016,7 @@ class DagRun(Base, LoggingMixin):
schedulable_ti_ids = []
for ti in schedulable_tis:
if (
- ti.task.inherits_from_dummy_operator
+ ti.task.inherits_from_empty_operator
and not ti.task.on_execute_callback
and not ti.task.on_success_callback
):
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 1ac4ff89fa..15ecee8798 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -190,7 +190,7 @@ class OperatorPartial:
def expand(self, **mapped_kwargs: "Mappable") -> "MappedOperator":
self._expand_called = True
- from airflow.operators.dummy import DummyOperator
+ from airflow.operators.empty import EmptyOperator
validate_mapping_kwargs(self.operator_class, "expand", mapped_kwargs)
prevent_duplicates(self.kwargs, mapped_kwargs, fail_reason="mapping
already partial")
@@ -217,7 +217,7 @@ class OperatorPartial:
template_fields_renderers=self.operator_class.template_fields_renderers,
ui_color=self.operator_class.ui_color,
ui_fgcolor=self.operator_class.ui_fgcolor,
- is_dummy=issubclass(self.operator_class, DummyOperator),
+ is_empty=issubclass(self.operator_class, EmptyOperator),
task_module=self.operator_class.__module__,
task_type=self.operator_class.__name__,
dag=dag,
@@ -255,7 +255,7 @@ class MappedOperator(AbstractOperator):
template_fields_renderers: Dict[str, str]
ui_color: str
ui_fgcolor: str
- _is_dummy: bool
+ _is_empty: bool
_task_module: str
_task_type: str
@@ -350,9 +350,9 @@ class MappedOperator(AbstractOperator):
return self._task_type
@property
- def inherits_from_dummy_operator(self) -> bool:
+ def inherits_from_empty_operator(self) -> bool:
"""Implementing Operator."""
- return self._is_dummy
+ return self._is_empty
@property
def roots(self) -> Sequence[AbstractOperator]:
diff --git a/airflow/operators/dummy.py b/airflow/operators/dummy.py
index 6cd0111dcf..b2912e9258 100644
--- a/airflow/operators/dummy.py
+++ b/airflow/operators/dummy.py
@@ -15,23 +15,31 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from airflow.models.baseoperator import BaseOperator
-from airflow.utils.context import Context
+"""This module is deprecated. Please use :mod:`airflow.operators.empty`."""
+import warnings
-class DummyOperator(BaseOperator):
- """
- Operator that does literally nothing. It can be used to group tasks in a
- DAG.
+from airflow.operators.empty import EmptyOperator
- The task is evaluated by the scheduler but never processed by the executor.
- """
+warnings.warn(
+ "This module is deprecated. Please use `airflow.operators.empty`.",
+ DeprecationWarning,
+ stacklevel=2,
+)
- ui_color = '#e8f7e4'
- inherits_from_dummy_operator = True
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
+class DummyOperator(EmptyOperator):
+ """This class is deprecated. Please use
`airflow.operators.empty.EmptyOperator`."""
+
+ @property
+ def inherits_from_dummy_operator(self):
+ return True
- def execute(self, context: Context):
- pass
+ def __init__(self, **kwargs):
+ warnings.warn(
+ """This class is deprecated. Please use
`airflow.operators.empty.EmptyOperator`.""",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ self.inherits_from_empty_operator = self.inherits_from_dummy_operator
+ super().__init__(**kwargs)
diff --git a/airflow/operators/dummy.py b/airflow/operators/empty.py
similarity index 88%
copy from airflow/operators/dummy.py
copy to airflow/operators/empty.py
index 6cd0111dcf..1f396d0f6a 100644
--- a/airflow/operators/dummy.py
+++ b/airflow/operators/empty.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,11 +14,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
-class DummyOperator(BaseOperator):
+class EmptyOperator(BaseOperator):
"""
Operator that does literally nothing. It can be used to group tasks in a
DAG.
@@ -28,10 +28,7 @@ class DummyOperator(BaseOperator):
"""
ui_color = '#e8f7e4'
- inherits_from_dummy_operator = True
-
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
+ inherits_from_empty_operator = True
def execute(self, context: Context):
pass
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index e8e522ced8..c1ed4d8c6e 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -628,8 +628,8 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
serialize_op['_task_type'] = getattr(op, "_task_type",
type(op).__name__)
serialize_op['_task_module'] = getattr(op, "_task_module",
type(op).__module__)
- # Used to determine if an Operator is inherited from DummyOperator
- serialize_op['_is_dummy'] = op.inherits_from_dummy_operator
+ # Used to determine if an Operator is inherited from EmptyOperator
+ serialize_op['_is_empty'] = op.inherits_from_empty_operator
if op.operator_extra_links:
serialize_op['_operator_extra_links'] =
cls._serialize_operator_extra_links(
@@ -712,6 +712,9 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
setattr(op, "operator_extra_links",
list(op_extra_links_from_plugin.values()))
for k, v in encoded_op.items():
+ # Todo: TODO: Remove in Airflow 3.0 when dummy operator is removed
+ if k == "_is_dummy":
+ k = "_is_empty"
if k == "_downstream_task_ids":
# Upgrade from old format/name
k = "downstream_task_ids"
@@ -774,7 +777,7 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
setattr(op, field, None)
# Used to determine if an Operator is inherited from DummyOperator
- setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy", False)))
+ setattr(op, "_is_empty", bool(encoded_op.get("_is_empty", False)))
@classmethod
def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Operator:
@@ -796,7 +799,7 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
template_fields_renderers=BaseOperator.template_fields_renderers,
ui_color=BaseOperator.ui_color,
ui_fgcolor=BaseOperator.ui_fgcolor,
- is_dummy=False,
+ is_empty=False,
task_module=encoded_op["_task_module"],
task_type=encoded_op["_task_type"],
dag=None,
diff --git a/dev/provider_packages/prepare_provider_packages.py
b/dev/provider_packages/prepare_provider_packages.py
index 4cb8e58f05..359ae8db1d 100755
--- a/dev/provider_packages/prepare_provider_packages.py
+++ b/dev/provider_packages/prepare_provider_packages.py
@@ -2155,6 +2155,16 @@ KNOWN_DEPRECATED_MESSAGES: Set[Tuple[str, str]] = {
),
("SelectableGroups dict interface is deprecated. Use select.", "kombu"),
("The module cloudant is now deprecated. The replacement is ibmcloudant.",
"cloudant"),
+ ("This module is deprecated. Please use `airflow.operators.empty`.",
"dbt"),
+ ("This module is deprecated. Please use `airflow.operators.empty`.",
"jdbc"),
+ ("This module is deprecated. Please use `airflow.operators.empty`.",
"azure"),
+ ("This module is deprecated. Please use `airflow.operators.empty`.",
"qubole"),
+ ("This module is deprecated. Please use `airflow.operators.empty`.",
"winrm"),
+ ("This class is deprecated. Please use
`airflow.operators.empty.EmptyOperator`.", "dbt"),
+ ("This class is deprecated. Please use
`airflow.operators.empty.EmptyOperator`.", "jdbc"),
+ ("This class is deprecated. Please use
`airflow.operators.empty.EmptyOperator`.", "azure"),
+ ("This class is deprecated. Please use
`airflow.operators.empty.EmptyOperator`.", "qubole"),
+ ("This class is deprecated. Please use
`airflow.operators.empty.EmptyOperator`.", "winrm"),
}
KNOWN_COMMON_DEPRECATED_MESSAGES: Set[str] = {
diff --git a/airflow/operators/dummy.py b/tests/operators/test_dummy.py
similarity index 60%
copy from airflow/operators/dummy.py
copy to tests/operators/test_dummy.py
index 6cd0111dcf..ab9538f8f4 100644
--- a/airflow/operators/dummy.py
+++ b/tests/operators/test_dummy.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,23 +14,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from airflow.models.baseoperator import BaseOperator
-from airflow.utils.context import Context
-
-
-class DummyOperator(BaseOperator):
- """
- Operator that does literally nothing. It can be used to group tasks in a
- DAG.
- The task is evaluated by the scheduler but never processed by the executor.
- """
+import pytest
- ui_color = '#e8f7e4'
- inherits_from_dummy_operator = True
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
+def test_deprecation_warnings_generated():
+ from airflow.operators.dummy import DummyOperator
- def execute(self, context: Context):
- pass
+ with pytest.warns(expected_warning=DeprecationWarning):
+ DummyOperator(task_id='my_task')
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index e2661eb68f..e072ecd4ee 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -103,7 +103,7 @@ serialized_simple_dag_ground_truth = {
"sla": 100.0,
"downstream_task_ids": [],
"_inlets": [],
- "_is_dummy": False,
+ "_is_empty": False,
"_outlets": [],
"ui_color": "#f0ede4",
"ui_fgcolor": "#000",
@@ -133,7 +133,7 @@ serialized_simple_dag_ground_truth = {
"sla": 100.0,
"downstream_task_ids": [],
"_inlets": [],
- "_is_dummy": False,
+ "_is_empty": False,
"_outlets": [],
"_operator_extra_links":
[{"tests.test_utils.mock_operators.CustomOpLink": {}}],
"ui_color": "#fff",
@@ -1620,7 +1620,7 @@ def test_mapped_operator_serde():
serialized = SerializedBaseOperator._serialize(real_op)
assert serialized == {
- '_is_dummy': False,
+ '_is_empty': False,
'_is_mapped': True,
'_task_module': 'airflow.operators.bash',
'_task_type': 'BashOperator',
@@ -1677,7 +1677,7 @@ def test_mapped_operator_xcomarg_serde():
serialized = SerializedBaseOperator._serialize(mapped)
assert serialized == {
- '_is_dummy': False,
+ '_is_empty': False,
'_is_mapped': True,
'_task_module': 'tests.test_utils.mock_operators',
'_task_type': 'MockOperator',
@@ -1756,7 +1756,7 @@ def test_mapped_decorator_serde():
serialized = SerializedBaseOperator._serialize(original)
assert serialized == {
- '_is_dummy': False,
+ '_is_empty': False,
'_is_mapped': True,
'_task_module': 'airflow.decorators.python',
'_task_type': '_PythonDecoratedOperator',
@@ -1796,3 +1796,45 @@ def test_mapped_decorator_serde():
"op_kwargs": {"arg1": [1, 2, {"a": "b"}]},
"retry_delay": timedelta(seconds=30),
}
+
+
[email protected]("ignore::DeprecationWarning")
[email protected](
+ "is_inherit",
+ [
+ True,
+ False,
+ ],
+)
+def test_dummy_operator_serde(is_inherit):
+ """
+ Test to verify that when user uses custom DummyOperator with
inherits_from_dummy_operator
+ we will have _is_empty in serialized operator.
+ """
+
+ # In this test we should NOT switch the DummyOperator to EmptyOperator.
+ # This test can be removed in Airflow 3.0 as DummyOperator will be removed
then.
+ from airflow.operators.dummy import DummyOperator
+
+ class MyDummyOperator(DummyOperator):
+ inherits_from_dummy_operator = is_inherit
+
+ op = MyDummyOperator(task_id='my_task')
+
+ serialized = SerializedBaseOperator._serialize(op)
+
+ assert serialized == {
+ '_is_empty': is_inherit,
+ '_task_module': 'tests.serialization.test_dag_serialization',
+ '_task_type': 'MyDummyOperator',
+ '_outlets': [],
+ '_inlets': [],
+ 'downstream_task_ids': [],
+ "pool": "default_pool",
+ 'task_id': 'my_task',
+ 'ui_color': '#e8f7e4',
+ 'ui_fgcolor': '#000',
+ 'template_ext': [],
+ 'template_fields': [],
+ 'template_fields_renderers': {},
+ }