This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4148883ee3 Deprecate `DummyOperator` in favor of `EmptyOperator` 
(#22832)
4148883ee3 is described below

commit 4148883ee31b55e368deb8da958e78d889abbe22
Author: eladkal <[email protected]>
AuthorDate: Wed Apr 13 09:47:56 2022 +0300

    Deprecate `DummyOperator` in favor of `EmptyOperator` (#22832)
    
    * Deprecate `DummyOperator` in favor of `EmptyOperator`
---
 airflow/decorators/base.py                         |  2 +-
 airflow/models/abstractoperator.py                 |  4 +-
 airflow/models/baseoperator.py                     | 10 ++---
 airflow/models/dagrun.py                           |  2 +-
 airflow/models/mappedoperator.py                   | 10 ++---
 airflow/operators/dummy.py                         | 36 +++++++++------
 airflow/operators/{dummy.py => empty.py}           |  9 ++--
 airflow/serialization/serialized_objects.py        | 11 +++--
 dev/provider_packages/prepare_provider_packages.py | 10 +++++
 .../dummy.py => tests/operators/test_dummy.py      | 22 +++------
 tests/serialization/test_dag_serialization.py      | 52 +++++++++++++++++++---
 11 files changed, 108 insertions(+), 60 deletions(-)

diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 678e145eba..3d66d01c91 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -359,7 +359,7 @@ class _TaskDecorator(Generic[Function, OperatorSubclass]):
             
template_fields_renderers=self.operator_class.template_fields_renderers,
             ui_color=self.operator_class.ui_color,
             ui_fgcolor=self.operator_class.ui_fgcolor,
-            is_dummy=False,
+            is_empty=False,
             task_module=self.operator_class.__module__,
             task_type=self.operator_class.__name__,
             dag=dag,
diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index 51e0af4eb6..c64b8554af 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -107,7 +107,7 @@ class AbstractOperator(LoggingMixin, DAGNode):
             'dag',  # We show dag_id, don't need to show this too
             'node_id',  # Duplicates task_id
             'task_group',  # Doesn't have a useful repr, no point showing in UI
-            'inherits_from_dummy_operator',  # impl detail
+            'inherits_from_empty_operator',  # impl detail
             # For compatibility with TG, for operators these are just the 
current task, no point showing
             'roots',
             'leaves',
@@ -128,7 +128,7 @@ class AbstractOperator(LoggingMixin, DAGNode):
         raise NotImplementedError()
 
     @property
-    def inherits_from_dummy_operator(self) -> bool:
+    def inherits_from_empty_operator(self) -> bool:
         raise NotImplementedError()
 
     @property
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 4433e9aca0..4b3ccdde8b 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1459,12 +1459,12 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
     is_mapped: ClassVar[bool] = False
 
     @property
-    def inherits_from_dummy_operator(self):
-        """Used to determine if an Operator is inherited from DummyOperator"""
-        # This looks like `isinstance(self, DummyOperator) would work, but 
this also
-        # needs to cope when `self` is a Serialized instance of a 
DummyOperator or one
+    def inherits_from_empty_operator(self):
+        """Used to determine if an Operator is inherited from EmptyOperator"""
+        # This looks like `isinstance(self, EmptyOperator) would work, but 
this also
+        # needs to cope when `self` is a Serialized instance of a 
EmptyOperator or one
         # of its sub-classes (which don't inherit from anything but 
BaseOperator).
-        return getattr(self, '_is_dummy', False)
+        return getattr(self, '_is_empty', False)
 
     def defer(
         self,
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index b5d91e74b1..74cbce5399 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1016,7 +1016,7 @@ class DagRun(Base, LoggingMixin):
         schedulable_ti_ids = []
         for ti in schedulable_tis:
             if (
-                ti.task.inherits_from_dummy_operator
+                ti.task.inherits_from_empty_operator
                 and not ti.task.on_execute_callback
                 and not ti.task.on_success_callback
             ):
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 1ac4ff89fa..15ecee8798 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -190,7 +190,7 @@ class OperatorPartial:
     def expand(self, **mapped_kwargs: "Mappable") -> "MappedOperator":
         self._expand_called = True
 
-        from airflow.operators.dummy import DummyOperator
+        from airflow.operators.empty import EmptyOperator
 
         validate_mapping_kwargs(self.operator_class, "expand", mapped_kwargs)
         prevent_duplicates(self.kwargs, mapped_kwargs, fail_reason="mapping 
already partial")
@@ -217,7 +217,7 @@ class OperatorPartial:
             
template_fields_renderers=self.operator_class.template_fields_renderers,
             ui_color=self.operator_class.ui_color,
             ui_fgcolor=self.operator_class.ui_fgcolor,
-            is_dummy=issubclass(self.operator_class, DummyOperator),
+            is_empty=issubclass(self.operator_class, EmptyOperator),
             task_module=self.operator_class.__module__,
             task_type=self.operator_class.__name__,
             dag=dag,
@@ -255,7 +255,7 @@ class MappedOperator(AbstractOperator):
     template_fields_renderers: Dict[str, str]
     ui_color: str
     ui_fgcolor: str
-    _is_dummy: bool
+    _is_empty: bool
     _task_module: str
     _task_type: str
 
@@ -350,9 +350,9 @@ class MappedOperator(AbstractOperator):
         return self._task_type
 
     @property
-    def inherits_from_dummy_operator(self) -> bool:
+    def inherits_from_empty_operator(self) -> bool:
         """Implementing Operator."""
-        return self._is_dummy
+        return self._is_empty
 
     @property
     def roots(self) -> Sequence[AbstractOperator]:
diff --git a/airflow/operators/dummy.py b/airflow/operators/dummy.py
index 6cd0111dcf..b2912e9258 100644
--- a/airflow/operators/dummy.py
+++ b/airflow/operators/dummy.py
@@ -15,23 +15,31 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from airflow.models.baseoperator import BaseOperator
-from airflow.utils.context import Context
+"""This module is deprecated. Please use :mod:`airflow.operators.empty`."""
 
+import warnings
 
-class DummyOperator(BaseOperator):
-    """
-    Operator that does literally nothing. It can be used to group tasks in a
-    DAG.
+from airflow.operators.empty import EmptyOperator
 
-    The task is evaluated by the scheduler but never processed by the executor.
-    """
+warnings.warn(
+    "This module is deprecated. Please use `airflow.operators.empty`.",
+    DeprecationWarning,
+    stacklevel=2,
+)
 
-    ui_color = '#e8f7e4'
-    inherits_from_dummy_operator = True
 
-    def __init__(self, **kwargs) -> None:
-        super().__init__(**kwargs)
+class DummyOperator(EmptyOperator):
+    """This class is deprecated. Please use 
`airflow.operators.empty.EmptyOperator`."""
+
+    @property
+    def inherits_from_dummy_operator(self):
+        return True
 
-    def execute(self, context: Context):
-        pass
+    def __init__(self, **kwargs):
+        warnings.warn(
+            """This class is deprecated. Please use 
`airflow.operators.empty.EmptyOperator`.""",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        self.inherits_from_empty_operator = self.inherits_from_dummy_operator
+        super().__init__(**kwargs)
diff --git a/airflow/operators/dummy.py b/airflow/operators/empty.py
similarity index 88%
copy from airflow/operators/dummy.py
copy to airflow/operators/empty.py
index 6cd0111dcf..1f396d0f6a 100644
--- a/airflow/operators/dummy.py
+++ b/airflow/operators/empty.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,11 +14,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 from airflow.models.baseoperator import BaseOperator
 from airflow.utils.context import Context
 
 
-class DummyOperator(BaseOperator):
+class EmptyOperator(BaseOperator):
     """
     Operator that does literally nothing. It can be used to group tasks in a
     DAG.
@@ -28,10 +28,7 @@ class DummyOperator(BaseOperator):
     """
 
     ui_color = '#e8f7e4'
-    inherits_from_dummy_operator = True
-
-    def __init__(self, **kwargs) -> None:
-        super().__init__(**kwargs)
+    inherits_from_empty_operator = True
 
     def execute(self, context: Context):
         pass
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index e8e522ced8..c1ed4d8c6e 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -628,8 +628,8 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
         serialize_op['_task_type'] = getattr(op, "_task_type", 
type(op).__name__)
         serialize_op['_task_module'] = getattr(op, "_task_module", 
type(op).__module__)
 
-        # Used to determine if an Operator is inherited from DummyOperator
-        serialize_op['_is_dummy'] = op.inherits_from_dummy_operator
+        # Used to determine if an Operator is inherited from EmptyOperator
+        serialize_op['_is_empty'] = op.inherits_from_empty_operator
 
         if op.operator_extra_links:
             serialize_op['_operator_extra_links'] = 
cls._serialize_operator_extra_links(
@@ -712,6 +712,9 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
                 setattr(op, "operator_extra_links", 
list(op_extra_links_from_plugin.values()))
 
         for k, v in encoded_op.items():
+            # Todo: TODO: Remove in Airflow 3.0 when dummy operator is removed
+            if k == "_is_dummy":
+                k = "_is_empty"
             if k == "_downstream_task_ids":
                 # Upgrade from old format/name
                 k = "downstream_task_ids"
@@ -774,7 +777,7 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
                 setattr(op, field, None)
 
         # Used to determine if an Operator is inherited from DummyOperator
-        setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy", False)))
+        setattr(op, "_is_empty", bool(encoded_op.get("_is_empty", False)))
 
     @classmethod
     def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Operator:
@@ -796,7 +799,7 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
                 
template_fields_renderers=BaseOperator.template_fields_renderers,
                 ui_color=BaseOperator.ui_color,
                 ui_fgcolor=BaseOperator.ui_fgcolor,
-                is_dummy=False,
+                is_empty=False,
                 task_module=encoded_op["_task_module"],
                 task_type=encoded_op["_task_type"],
                 dag=None,
diff --git a/dev/provider_packages/prepare_provider_packages.py 
b/dev/provider_packages/prepare_provider_packages.py
index 4cb8e58f05..359ae8db1d 100755
--- a/dev/provider_packages/prepare_provider_packages.py
+++ b/dev/provider_packages/prepare_provider_packages.py
@@ -2155,6 +2155,16 @@ KNOWN_DEPRECATED_MESSAGES: Set[Tuple[str, str]] = {
     ),
     ("SelectableGroups dict interface is deprecated. Use select.", "kombu"),
     ("The module cloudant is now deprecated. The replacement is ibmcloudant.", 
"cloudant"),
+    ("This module is deprecated. Please use `airflow.operators.empty`.", 
"dbt"),
+    ("This module is deprecated. Please use `airflow.operators.empty`.", 
"jdbc"),
+    ("This module is deprecated. Please use `airflow.operators.empty`.", 
"azure"),
+    ("This module is deprecated. Please use `airflow.operators.empty`.", 
"qubole"),
+    ("This module is deprecated. Please use `airflow.operators.empty`.", 
"winrm"),
+    ("This class is deprecated. Please use 
`airflow.operators.empty.EmptyOperator`.", "dbt"),
+    ("This class is deprecated. Please use 
`airflow.operators.empty.EmptyOperator`.", "jdbc"),
+    ("This class is deprecated. Please use 
`airflow.operators.empty.EmptyOperator`.", "azure"),
+    ("This class is deprecated. Please use 
`airflow.operators.empty.EmptyOperator`.", "qubole"),
+    ("This class is deprecated. Please use 
`airflow.operators.empty.EmptyOperator`.", "winrm"),
 }
 
 KNOWN_COMMON_DEPRECATED_MESSAGES: Set[str] = {
diff --git a/airflow/operators/dummy.py b/tests/operators/test_dummy.py
similarity index 60%
copy from airflow/operators/dummy.py
copy to tests/operators/test_dummy.py
index 6cd0111dcf..ab9538f8f4 100644
--- a/airflow/operators/dummy.py
+++ b/tests/operators/test_dummy.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,23 +14,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from airflow.models.baseoperator import BaseOperator
-from airflow.utils.context import Context
-
-
-class DummyOperator(BaseOperator):
-    """
-    Operator that does literally nothing. It can be used to group tasks in a
-    DAG.
 
-    The task is evaluated by the scheduler but never processed by the executor.
-    """
+import pytest
 
-    ui_color = '#e8f7e4'
-    inherits_from_dummy_operator = True
 
-    def __init__(self, **kwargs) -> None:
-        super().__init__(**kwargs)
+def test_deprecation_warnings_generated():
+    from airflow.operators.dummy import DummyOperator
 
-    def execute(self, context: Context):
-        pass
+    with pytest.warns(expected_warning=DeprecationWarning):
+        DummyOperator(task_id='my_task')
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index e2661eb68f..e072ecd4ee 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -103,7 +103,7 @@ serialized_simple_dag_ground_truth = {
                 "sla": 100.0,
                 "downstream_task_ids": [],
                 "_inlets": [],
-                "_is_dummy": False,
+                "_is_empty": False,
                 "_outlets": [],
                 "ui_color": "#f0ede4",
                 "ui_fgcolor": "#000",
@@ -133,7 +133,7 @@ serialized_simple_dag_ground_truth = {
                 "sla": 100.0,
                 "downstream_task_ids": [],
                 "_inlets": [],
-                "_is_dummy": False,
+                "_is_empty": False,
                 "_outlets": [],
                 "_operator_extra_links": 
[{"tests.test_utils.mock_operators.CustomOpLink": {}}],
                 "ui_color": "#fff",
@@ -1620,7 +1620,7 @@ def test_mapped_operator_serde():
     serialized = SerializedBaseOperator._serialize(real_op)
 
     assert serialized == {
-        '_is_dummy': False,
+        '_is_empty': False,
         '_is_mapped': True,
         '_task_module': 'airflow.operators.bash',
         '_task_type': 'BashOperator',
@@ -1677,7 +1677,7 @@ def test_mapped_operator_xcomarg_serde():
 
     serialized = SerializedBaseOperator._serialize(mapped)
     assert serialized == {
-        '_is_dummy': False,
+        '_is_empty': False,
         '_is_mapped': True,
         '_task_module': 'tests.test_utils.mock_operators',
         '_task_type': 'MockOperator',
@@ -1756,7 +1756,7 @@ def test_mapped_decorator_serde():
 
     serialized = SerializedBaseOperator._serialize(original)
     assert serialized == {
-        '_is_dummy': False,
+        '_is_empty': False,
         '_is_mapped': True,
         '_task_module': 'airflow.decorators.python',
         '_task_type': '_PythonDecoratedOperator',
@@ -1796,3 +1796,45 @@ def test_mapped_decorator_serde():
         "op_kwargs": {"arg1": [1, 2, {"a": "b"}]},
         "retry_delay": timedelta(seconds=30),
     }
+
+
[email protected]("ignore::DeprecationWarning")
[email protected](
+    "is_inherit",
+    [
+        True,
+        False,
+    ],
+)
+def test_dummy_operator_serde(is_inherit):
+    """
+    Test to verify that when user uses custom DummyOperator with 
inherits_from_dummy_operator
+    we will have _is_empty in serialized operator.
+    """
+
+    # In this test we should NOT switch the DummyOperator to EmptyOperator.
+    # This test can be removed in Airflow 3.0 as DummyOperator will be removed 
then.
+    from airflow.operators.dummy import DummyOperator
+
+    class MyDummyOperator(DummyOperator):
+        inherits_from_dummy_operator = is_inherit
+
+    op = MyDummyOperator(task_id='my_task')
+
+    serialized = SerializedBaseOperator._serialize(op)
+
+    assert serialized == {
+        '_is_empty': is_inherit,
+        '_task_module': 'tests.serialization.test_dag_serialization',
+        '_task_type': 'MyDummyOperator',
+        '_outlets': [],
+        '_inlets': [],
+        'downstream_task_ids': [],
+        "pool": "default_pool",
+        'task_id': 'my_task',
+        'ui_color': '#e8f7e4',
+        'ui_fgcolor': '#000',
+        'template_ext': [],
+        'template_fields': [],
+        'template_fields_renderers': {},
+    }

Reply via email to