ashb commented on a change in pull request #17270:
URL: https://github.com/apache/airflow/pull/17270#discussion_r677794935
##########
File path: airflow/decorators/__init__.py
##########
@@ -41,103 +43,13 @@ def __call__(
"""
return self.python(python_callable=python_callable,
multiple_outputs=multiple_outputs, **kwargs)
- @staticmethod
- def python(python_callable: Optional[Callable] = None, multiple_outputs:
Optional[bool] = None, **kwargs):
- """
- Python operator decorator. Wraps a function into an Airflow operator.
- Accepts kwargs for operator kwarg. This decorator can be reused in a
single DAG.
-
- :param python_callable: Function to decorate
- :type python_callable: Optional[Callable]
- :param multiple_outputs: if set, function return value will be
- unrolled to multiple XCom values. List/Tuples will unroll to xcom
values
- with index as key. Dict will unroll to xcom values with keys as
XCom keys.
- Defaults to False.
- :type multiple_outputs: bool
- """
- return python_task(python_callable=python_callable,
multiple_outputs=multiple_outputs, **kwargs)
-
- @staticmethod
- def virtualenv(
- python_callable: Optional[Callable] = None,
- multiple_outputs: Optional[bool] = None,
- requirements: Optional[Iterable[str]] = None,
- python_version: Optional[Union[str, int, float]] = None,
- use_dill: bool = False,
- system_site_packages: bool = True,
- string_args: Optional[Iterable[str]] = None,
- templates_dict: Optional[Dict] = None,
- templates_exts: Optional[List[str]] = None,
- **kwargs,
- ):
- """
- Allows one to run a function in a virtualenv that is
- created and destroyed automatically (with certain caveats).
-
- The function must be defined using def, and not be
- part of a class. All imports must happen inside the function
- and no variables outside of the scope may be referenced. A global scope
- variable named virtualenv_string_args will be available (populated by
- string_args). In addition, one can pass stuff through op_args and
op_kwargs, and one
- can use a return value.
- Note that if your virtualenv runs in a different Python major version
than Airflow,
- you cannot use return values, op_args, op_kwargs, or use any macros
that are being provided to
- Airflow through plugins. You can use string_args though.
-
- .. seealso::
- For more information on how to use this operator, take a look at
the guide:
- :ref:`howto/operator:PythonVirtualenvOperator`
-
- :param python_callable: A python function with no references to
outside variables,
- defined with def, which will be run in a virtualenv
- :type python_callable: function
- :param multiple_outputs: if set, function return value will be
- unrolled to multiple XCom values. List/Tuples will unroll to xcom
values
- with index as key. Dict will unroll to xcom values with keys as
XCom keys.
- Defaults to False.
- :type multiple_outputs: bool
- :param requirements: A list of requirements as specified in a pip
install command
- :type requirements: list[str]
- :param python_version: The Python version to run the virtualenv with.
Note that
- both 2 and 2.7 are acceptable forms.
- :type python_version: Optional[Union[str, int, float]]
- :param use_dill: Whether to use dill to serialize
- the args and result (pickle is default). This allow more complex
types
- but requires you to include dill in your requirements.
- :type use_dill: bool
- :param system_site_packages: Whether to include
- system_site_packages in your virtualenv.
- See virtualenv documentation for more information.
- :type system_site_packages: bool
- :param op_args: A list of positional arguments to pass to
python_callable.
- :type op_args: list
- :param op_kwargs: A dict of keyword arguments to pass to
python_callable.
- :type op_kwargs: dict
- :param string_args: Strings that are present in the global var
virtualenv_string_args,
- available to python_callable at runtime as a list[str]. Note that
args are split
- by newline.
- :type string_args: list[str]
- :param templates_dict: a dictionary where the values are templates that
- will get templated by the Airflow engine sometime between
- ``__init__`` and ``execute`` takes place and are made available
- in your callable's context after the template has been applied
- :type templates_dict: dict of str
- :param templates_exts: a list of file extensions to resolve while
- processing templated fields, for examples ``['.sql', '.hql']``
- :type templates_exts: list[str]
- """
- return _virtualenv_task(
- python_callable=python_callable,
- multiple_outputs=multiple_outputs,
- requirements=requirements,
- python_version=python_version,
- use_dill=use_dill,
- system_site_packages=system_site_packages,
- string_args=string_args,
- templates_dict=templates_dict,
- templates_exts=templates_exts,
- **kwargs,
- )
+ def __getattr__(self, name):
+ if self.store.get(name, None):
+ return self.store[name]
+ connections = [e for e in
metadata.entry_points()['task_decorator_connections'] if e.name == name]
Review comment:
Entrypoint names are globally unique across all python packagesx so this
should be called something like `airflow.task_decorators`
(`console_scripts` is a "core" one, but ones we create should be good
citizens and scope ourselves appropriately)
Additionally everytime we call `metadata.entry_points()` it has to read
possibly hundreds of files, so we might want to think about more
caching/preloading the _entrypoint names_ (but not the code themesleves),
particularly when it comes to the fact that DAG parsing happens in a subprocess
-- so the loading of entrypoint info would need to be warmed somehow in the
parent before forking/creating the process pool.
##########
File path: airflow/decorators/__init__.py
##########
@@ -15,15 +15,17 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Callable, Dict, Iterable, List, Optional, Union
+from typing import Callable, Optional
+
+import importlib_metadata as metadata
-from airflow.decorators.python import python_task
-from airflow.decorators.python_virtualenv import _virtualenv_task
-from airflow.decorators.task_group import task_group # noqa
Review comment:
This one is not accessibly by `@task.task_group` -- I think you were
over-zealous in your removal of stuff :D
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]