This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f12e7e4a9 Make the decorators of `PythonOperator` sub-classes extend 
its decorator (#32845)
8f12e7e4a9 is described below

commit 8f12e7e4a9374e886965f3134aa801a5a267a36d
Author: Hussein Awala <[email protected]>
AuthorDate: Mon Jul 31 22:15:24 2023 +0200

    Make the decorators of `PythonOperator` sub-classes extend its decorator 
(#32845)
    
    * Fix template_fields in the decorators of PythonOperator subclasses
    
    * Make the decorators classes extend _PythonDecoratedOperator instead of 
DecoratedOperator
---
 airflow/decorators/branch_python.py     | 29 ++++------------------------
 airflow/decorators/external_python.py   | 34 ++++-----------------------------
 airflow/decorators/python_virtualenv.py | 31 ++++--------------------------
 airflow/decorators/short_circuit.py     | 31 ++++--------------------------
 4 files changed, 16 insertions(+), 109 deletions(-)

diff --git a/airflow/decorators/branch_python.py 
b/airflow/decorators/branch_python.py
index 6ba658eb8a..4dcff0a361 100644
--- a/airflow/decorators/branch_python.py
+++ b/airflow/decorators/branch_python.py
@@ -18,37 +18,16 @@ from __future__ import annotations
 
 from typing import Callable
 
-from airflow.decorators.base import DecoratedOperator, TaskDecorator, 
task_decorator_factory
+from airflow.decorators.base import TaskDecorator, task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
 from airflow.operators.python import BranchPythonOperator
 
 
-class _BranchPythonDecoratedOperator(DecoratedOperator, BranchPythonOperator):
-    """
-    Wraps a Python callable and captures args/kwargs when called for execution.
-
-    :param python_callable: A reference to an object that is callable
-    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
-        in your function (templated)
-    :param op_args: a list of positional arguments that will get unpacked when
-        calling your callable (templated)
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
-        Defaults to False.
-    """
+class _BranchPythonDecoratedOperator(_PythonDecoratedOperator, 
BranchPythonOperator):
+    """Wraps a Python callable and captures args/kwargs when called for 
execution."""
 
     custom_operator_name: str = "@task.branch"
 
-    def __init__(
-        self,
-        **kwargs,
-    ) -> None:
-        kwargs_to_upstream = {
-            "python_callable": kwargs["python_callable"],
-            "op_args": kwargs["op_args"],
-            "op_kwargs": kwargs["op_kwargs"],
-        }
-        super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs)
-
 
 def branch_task(
     python_callable: Callable | None = None, multiple_outputs: bool | None = 
None, **kwargs
diff --git a/airflow/decorators/external_python.py 
b/airflow/decorators/external_python.py
index 7862e9ec58..1f083a144a 100644
--- a/airflow/decorators/external_python.py
+++ b/airflow/decorators/external_python.py
@@ -18,42 +18,16 @@ from __future__ import annotations
 
 from typing import Callable
 
-from airflow.decorators.base import DecoratedOperator, TaskDecorator, 
task_decorator_factory
+from airflow.decorators.base import TaskDecorator, task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
 from airflow.operators.python import ExternalPythonOperator
 
 
-class _PythonExternalDecoratedOperator(DecoratedOperator, 
ExternalPythonOperator):
-    """
-    Wraps a Python callable and captures args/kwargs when called for execution.
-
-    :param python: Full path string (file-system specific) that points to a 
Python binary inside
-        a virtualenv that should be used (in ``VENV/bin`` folder). Should be 
absolute path
-        (so usually start with "/" or "X:/" depending on the filesystem/os 
used).
-    :param python_callable: A reference to an object that is callable
-    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
-        in your function (templated)
-    :param op_args: a list of positional arguments that will get unpacked when
-        calling your callable (templated)
-    :param multiple_outputs: If set to True, the decorated function's return 
value will be unrolled to
-        multiple XCom values. Dict will unroll to XCom values with its keys as 
XCom keys. Defaults to False.
-    """
+class _PythonExternalDecoratedOperator(_PythonDecoratedOperator, 
ExternalPythonOperator):
+    """Wraps a Python callable and captures args/kwargs when called for 
execution."""
 
     custom_operator_name: str = "@task.external_python"
 
-    def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) -> 
None:
-        kwargs_to_upstream = {
-            "python_callable": python_callable,
-            "op_args": op_args,
-            "op_kwargs": op_kwargs,
-        }
-        super().__init__(
-            kwargs_to_upstream=kwargs_to_upstream,
-            python_callable=python_callable,
-            op_args=op_args,
-            op_kwargs=op_kwargs,
-            **kwargs,
-        )
-
 
 def external_python_task(
     python: str | None = None,
diff --git a/airflow/decorators/python_virtualenv.py 
b/airflow/decorators/python_virtualenv.py
index c6182b7fb0..123378d43c 100644
--- a/airflow/decorators/python_virtualenv.py
+++ b/airflow/decorators/python_virtualenv.py
@@ -18,39 +18,16 @@ from __future__ import annotations
 
 from typing import Callable
 
-from airflow.decorators.base import DecoratedOperator, TaskDecorator, 
task_decorator_factory
+from airflow.decorators.base import TaskDecorator, task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
 from airflow.operators.python import PythonVirtualenvOperator
 
 
-class _PythonVirtualenvDecoratedOperator(DecoratedOperator, 
PythonVirtualenvOperator):
-    """
-    Wraps a Python callable and captures args/kwargs when called for execution.
-
-    :param python_callable: A reference to an object that is callable
-    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
-        in your function (templated)
-    :param op_args: a list of positional arguments that will get unpacked when
-        calling your callable (templated)
-    :param multiple_outputs: If set to True, the decorated function's return 
value will be unrolled to
-        multiple XCom values. Dict will unroll to XCom values with its keys as 
XCom keys. Defaults to False.
-    """
+class _PythonVirtualenvDecoratedOperator(_PythonDecoratedOperator, 
PythonVirtualenvOperator):
+    """Wraps a Python callable and captures args/kwargs when called for 
execution."""
 
     custom_operator_name: str = "@task.virtualenv"
 
-    def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) -> 
None:
-        kwargs_to_upstream = {
-            "python_callable": python_callable,
-            "op_args": op_args,
-            "op_kwargs": op_kwargs,
-        }
-        super().__init__(
-            kwargs_to_upstream=kwargs_to_upstream,
-            python_callable=python_callable,
-            op_args=op_args,
-            op_kwargs=op_kwargs,
-            **kwargs,
-        )
-
 
 def virtualenv_task(
     python_callable: Callable | None = None,
diff --git a/airflow/decorators/short_circuit.py 
b/airflow/decorators/short_circuit.py
index 34d35785e6..b6b2de5632 100644
--- a/airflow/decorators/short_circuit.py
+++ b/airflow/decorators/short_circuit.py
@@ -18,39 +18,16 @@ from __future__ import annotations
 
 from typing import Callable
 
-from airflow.decorators.base import DecoratedOperator, TaskDecorator, 
task_decorator_factory
+from airflow.decorators.base import TaskDecorator, task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
 from airflow.operators.python import ShortCircuitOperator
 
 
-class _ShortCircuitDecoratedOperator(DecoratedOperator, ShortCircuitOperator):
-    """
-    Wraps a Python callable and captures args/kwargs when called for execution.
-
-    :param python_callable: A reference to an object that is callable
-    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
-        in your function (templated)
-    :param op_args: a list of positional arguments that will get unpacked when
-        calling your callable (templated)
-    :param multiple_outputs: If set to True, the decorated function's return 
value will be unrolled to
-        multiple XCom values. Dict will unroll to XCom values with its keys as 
XCom keys. Defaults to False.
-    """
+class _ShortCircuitDecoratedOperator(_PythonDecoratedOperator, 
ShortCircuitOperator):
+    """Wraps a Python callable and captures args/kwargs when called for 
execution."""
 
     custom_operator_name: str = "@task.short_circuit"
 
-    def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) -> 
None:
-        kwargs_to_upstream = {
-            "python_callable": python_callable,
-            "op_args": op_args,
-            "op_kwargs": op_kwargs,
-        }
-        super().__init__(
-            kwargs_to_upstream=kwargs_to_upstream,
-            python_callable=python_callable,
-            op_args=op_args,
-            op_kwargs=op_kwargs,
-            **kwargs,
-        )
-
 
 def short_circuit_task(
     python_callable: Callable | None = None,

Reply via email to