GitHub user pykenny edited a discussion: Using nested (decorated) function in
decorated tasks (plus integration with TaskFlow)
I'm planning to design several customized function decorators that appends some
extra context to the task function, and if possible, combining multiple
contexts by nesting multiple decorated functions together before passing to
`@task` decorator.
The below code snippet was not tested in my development environment, however
somewhat describes the attempt I'm going to do:
```python
from typing import Any
from logging import Logger
from functools import wrap
from airflow.decorators import dag, task
def default_logger(f):
@wrap(f)
def callable(*args, *kwargs):
from logging import getLogger
logger = getLogger("airflow.task")
return f(logger=logger, *args, **kwargs)
return callable
def load_dag_storage(f, serialize_output: bool = False):
@wrap(f)
def callable(dag_id: str, *args, **kwargs):
from my_package.dag_storage import get_dag_stoarge_data,
serialize_storage_data
storage_data = get_dag_stoarge_data(dag_id=dag_id)
task_output = f(*args, **kwargs)
if serialize_output:
task_output = serialize_storage_data(task_output)
return task_output
DAG_ID = "my_dag"
@dag(dag_id=DAG_ID, ...)
def():
@task(task_id="upstream_task")
@default_logger
def upstream_task() -> dict:
data: dict
logger.info("Processing upstream data...")
# Obtain data...
return data
@task(task_id="my_task")
@load_dag_storage(serialize_output=True)
@default_logger
def my_task(upstream_data: dict, logger: Logger = None, storage_data: dict
= None):
logger.info("Running my task...")
result: dict
# Do some work with storage_data and upstream_data, then generate
`result`
logger.info("Done!")
return result
my_task(upstream_data=upstream_task(), dag_id=DAG_ID)
dag = my_dag()
```
In the example above, `@default_logger` decorator initiates `airflow.task`
logger and passed to the wrapped function as `logger` parameter, and
`@load_dag_storage` decorator tries to access some storage indexed by Airflow
DAG ID, with optional argument indicating whether to serialize output returned
from the wrapped function. Additionally, `my_task` accepts a mix of fixed
parameter and TaskFlow/XCom as arguments.
Any general guidelines for using decorators (or wrapped functions, inner
functions, closure, etc.) when writing up decorated task functions?
GitHub link: https://github.com/apache/airflow/discussions/45963
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]