This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8f12e7e4a9 Make the decorators of `PythonOperator` sub-classes extend
its decorator (#32845)
8f12e7e4a9 is described below
commit 8f12e7e4a9374e886965f3134aa801a5a267a36d
Author: Hussein Awala <[email protected]>
AuthorDate: Mon Jul 31 22:15:24 2023 +0200
Make the decorators of `PythonOperator` sub-classes extend its decorator
(#32845)
* Fix template_fields in the decorators of PythonOperator subclasses
* Make the decorators classes extend _PythonDecoratedOperator instead of
DecoratedOperator
---
airflow/decorators/branch_python.py | 29 ++++------------------------
airflow/decorators/external_python.py | 34 ++++-----------------------------
airflow/decorators/python_virtualenv.py | 31 ++++--------------------------
airflow/decorators/short_circuit.py | 31 ++++--------------------------
4 files changed, 16 insertions(+), 109 deletions(-)
diff --git a/airflow/decorators/branch_python.py
b/airflow/decorators/branch_python.py
index 6ba658eb8a..4dcff0a361 100644
--- a/airflow/decorators/branch_python.py
+++ b/airflow/decorators/branch_python.py
@@ -18,37 +18,16 @@ from __future__ import annotations
from typing import Callable
-from airflow.decorators.base import DecoratedOperator, TaskDecorator,
task_decorator_factory
+from airflow.decorators.base import TaskDecorator, task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import BranchPythonOperator
-class _BranchPythonDecoratedOperator(DecoratedOperator, BranchPythonOperator):
- """
- Wraps a Python callable and captures args/kwargs when called for execution.
-
- :param python_callable: A reference to an object that is callable
- :param op_kwargs: a dictionary of keyword arguments that will get unpacked
- in your function (templated)
- :param op_args: a list of positional arguments that will get unpacked when
- calling your callable (templated)
- :param multiple_outputs: if set, function return value will be
- unrolled to multiple XCom values. Dict will unroll to xcom values with
keys as keys.
- Defaults to False.
- """
+class _BranchPythonDecoratedOperator(_PythonDecoratedOperator,
BranchPythonOperator):
+ """Wraps a Python callable and captures args/kwargs when called for
execution."""
custom_operator_name: str = "@task.branch"
- def __init__(
- self,
- **kwargs,
- ) -> None:
- kwargs_to_upstream = {
- "python_callable": kwargs["python_callable"],
- "op_args": kwargs["op_args"],
- "op_kwargs": kwargs["op_kwargs"],
- }
- super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs)
-
def branch_task(
python_callable: Callable | None = None, multiple_outputs: bool | None =
None, **kwargs
diff --git a/airflow/decorators/external_python.py
b/airflow/decorators/external_python.py
index 7862e9ec58..1f083a144a 100644
--- a/airflow/decorators/external_python.py
+++ b/airflow/decorators/external_python.py
@@ -18,42 +18,16 @@ from __future__ import annotations
from typing import Callable
-from airflow.decorators.base import DecoratedOperator, TaskDecorator,
task_decorator_factory
+from airflow.decorators.base import TaskDecorator, task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import ExternalPythonOperator
-class _PythonExternalDecoratedOperator(DecoratedOperator,
ExternalPythonOperator):
- """
- Wraps a Python callable and captures args/kwargs when called for execution.
-
- :param python: Full path string (file-system specific) that points to a
Python binary inside
- a virtualenv that should be used (in ``VENV/bin`` folder). Should be
absolute path
- (so usually start with "/" or "X:/" depending on the filesystem/os
used).
- :param python_callable: A reference to an object that is callable
- :param op_kwargs: a dictionary of keyword arguments that will get unpacked
- in your function (templated)
- :param op_args: a list of positional arguments that will get unpacked when
- calling your callable (templated)
- :param multiple_outputs: If set to True, the decorated function's return
value will be unrolled to
- multiple XCom values. Dict will unroll to XCom values with its keys as
XCom keys. Defaults to False.
- """
+class _PythonExternalDecoratedOperator(_PythonDecoratedOperator,
ExternalPythonOperator):
+ """Wraps a Python callable and captures args/kwargs when called for
execution."""
custom_operator_name: str = "@task.external_python"
- def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) ->
None:
- kwargs_to_upstream = {
- "python_callable": python_callable,
- "op_args": op_args,
- "op_kwargs": op_kwargs,
- }
- super().__init__(
- kwargs_to_upstream=kwargs_to_upstream,
- python_callable=python_callable,
- op_args=op_args,
- op_kwargs=op_kwargs,
- **kwargs,
- )
-
def external_python_task(
python: str | None = None,
diff --git a/airflow/decorators/python_virtualenv.py
b/airflow/decorators/python_virtualenv.py
index c6182b7fb0..123378d43c 100644
--- a/airflow/decorators/python_virtualenv.py
+++ b/airflow/decorators/python_virtualenv.py
@@ -18,39 +18,16 @@ from __future__ import annotations
from typing import Callable
-from airflow.decorators.base import DecoratedOperator, TaskDecorator,
task_decorator_factory
+from airflow.decorators.base import TaskDecorator, task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import PythonVirtualenvOperator
-class _PythonVirtualenvDecoratedOperator(DecoratedOperator,
PythonVirtualenvOperator):
- """
- Wraps a Python callable and captures args/kwargs when called for execution.
-
- :param python_callable: A reference to an object that is callable
- :param op_kwargs: a dictionary of keyword arguments that will get unpacked
- in your function (templated)
- :param op_args: a list of positional arguments that will get unpacked when
- calling your callable (templated)
- :param multiple_outputs: If set to True, the decorated function's return
value will be unrolled to
- multiple XCom values. Dict will unroll to XCom values with its keys as
XCom keys. Defaults to False.
- """
+class _PythonVirtualenvDecoratedOperator(_PythonDecoratedOperator,
PythonVirtualenvOperator):
+ """Wraps a Python callable and captures args/kwargs when called for
execution."""
custom_operator_name: str = "@task.virtualenv"
- def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) ->
None:
- kwargs_to_upstream = {
- "python_callable": python_callable,
- "op_args": op_args,
- "op_kwargs": op_kwargs,
- }
- super().__init__(
- kwargs_to_upstream=kwargs_to_upstream,
- python_callable=python_callable,
- op_args=op_args,
- op_kwargs=op_kwargs,
- **kwargs,
- )
-
def virtualenv_task(
python_callable: Callable | None = None,
diff --git a/airflow/decorators/short_circuit.py
b/airflow/decorators/short_circuit.py
index 34d35785e6..b6b2de5632 100644
--- a/airflow/decorators/short_circuit.py
+++ b/airflow/decorators/short_circuit.py
@@ -18,39 +18,16 @@ from __future__ import annotations
from typing import Callable
-from airflow.decorators.base import DecoratedOperator, TaskDecorator,
task_decorator_factory
+from airflow.decorators.base import TaskDecorator, task_decorator_factory
+from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import ShortCircuitOperator
-class _ShortCircuitDecoratedOperator(DecoratedOperator, ShortCircuitOperator):
- """
- Wraps a Python callable and captures args/kwargs when called for execution.
-
- :param python_callable: A reference to an object that is callable
- :param op_kwargs: a dictionary of keyword arguments that will get unpacked
- in your function (templated)
- :param op_args: a list of positional arguments that will get unpacked when
- calling your callable (templated)
- :param multiple_outputs: If set to True, the decorated function's return
value will be unrolled to
- multiple XCom values. Dict will unroll to XCom values with its keys as
XCom keys. Defaults to False.
- """
+class _ShortCircuitDecoratedOperator(_PythonDecoratedOperator,
ShortCircuitOperator):
+ """Wraps a Python callable and captures args/kwargs when called for
execution."""
custom_operator_name: str = "@task.short_circuit"
- def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) ->
None:
- kwargs_to_upstream = {
- "python_callable": python_callable,
- "op_args": op_args,
- "op_kwargs": op_kwargs,
- }
- super().__init__(
- kwargs_to_upstream=kwargs_to_upstream,
- python_callable=python_callable,
- op_args=op_args,
- op_kwargs=op_kwargs,
- **kwargs,
- )
-
def short_circuit_task(
python_callable: Callable | None = None,