GitHub user pykenny edited a discussion: Using nested (decorated) function in 
decorated tasks (plus integration with TaskFlow)

I'm planning to design several customized function decorators that appends some 
extra context to the task function, and if possible, combining multiple 
contexts by nesting multiple decorated functions together before passing to 
`@task` decorator.

The below code snippet was not tested in my development environment, however 
somewhat describes the attempt I'm going to do:

```python
from typing import Any
from logging import Logger
from functools import wrap

from airflow.decorators import dag, task


def default_logger(f):
    @wrap(f)
    def callable(*args, *kwargs):
        from logging import getLogger

        logger = getLogger("airflow.task")
        
        return f(logger=logger, *args, **kwargs)

    return callable

def load_dag_storage(f, serialize_output: bool = False):
    @wrap(f)
    def callable(dag_id: str, *args, **kwargs):
        from my_package.dag_storage import get_dag_stoarge_data, 
serialize_storage_data

        storage_data = get_dag_stoarge_data(dag_id=dag_id)
        task_output = f(*args, **kwargs)
        if serialize_output:
            task_output = serialize_storage_data(task_output)

        return task_output


DAG_ID = "my_dag"


@dag(dag_id=DAG_ID, ...)
def():
    @task(task_id="upstream_task")
    @default_logger
    def upstream_task() -> dict:
        data: dict

        logger.info("Processing upstream data...")
        # Obtain data...

        return data

    @task(task_id="my_task")
    @load_dag_storage
    @default_logger
    def my_task(upstream_data: dict, logger: Logger = None, storage_data: dict 
= None):
        logger.info("Running my task...")
        # Do some work with storage_data and upstream_data...
        logger.info("Done!")

    my_task(upstream_data=upstream_task(), dag_id=DAG_ID)


dag = my_dag()
```

In the example above, `@default_logger` decorator initiates `airflow.task` 
logger and passed to the wrapped function as `logger` parameter, and 
`@load_dag_storage` decorator tries to access some storage indexed by Airflow 
DAG ID, with optional argument indicating whether to serialize output returned 
from the wrapped function. Additionally, `my_task` accepts a mix of fixed 
parameter and TaskFlow/XCom as arguments.


Any general guidelines for using decorators (or wrapped functions, inner 
functions, closure, etc.) when writing up decorated task functions?

GitHub link: https://github.com/apache/airflow/discussions/45963

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to