GitHub user pykenny edited a discussion: Using nested (decorated) function in 
decorated tasks (plus integration with TaskFlow)

I'm planning to design several customized function decorators that appends some 
extra context to the task function, and if possible, combining multiple 
contexts by nesting multiple decorated functions together before passing to 
`@task` decorator.

The below code snippet was not tested in my development environment, however 
somewhat describes the attempt I'm going to do:

```python
from logging import Logger
from functools import wrap

from airflow.decorators import dag, task


def default_logger(f):
    @wrap(f)
    def callable(*args, *kwargs):
        from logging import getLogger

        logger = getLogger("airflow.task")
        
        return f(logger=logger, *args, **kwargs)

    return callable

def load_dag_storage(f):
    @wrap(f)
    def callable(dag_id: str, *args, *kwargs):
        from my_package.dag_storage import get_dag_stoarge_data

        storage_data = get_dag_stoarge_data(dag_id=dag_id)
        # (Process storage_data...)

        return f(storage_data=storage_data *args, **kwargs)


DAG_ID = "my_dag"


@dag(dag_id=DAG_ID, ...)
def():
    @task(task_id="upstream_task")
    @default_logger
    def upstream_task() -> dict:
        data: dict

        logger.info("Processing upstream data...")
        # Obtain data...

        return data

    @task(task_id="my_task")
    @load_dag_storage
    @default_logger
    def my_task(upstream_data: dict, logger: Logger = None, storage_data: dict 
= None):
        logger.info("Running my task...")
        # Do some work with storage_data and upstream_data...
        logger.info("Done!")

    my_task(upstream_data=upstream_task(), dag_id=DAG_ID)


dag = my_dag()
```

In the example above, `@default_logger` decorator initiates `airflow.task` 
logger and passed to the wrapped function as `logger` parameter, and 
`@load_dag_storage` decorator tries to access some storage indexed by Airflow 
DAG ID. Additionally, `my_task` accepts a mix of fixed parameter and 
TaskFlow/XCom as arguments.


Any general guidelines for using decorators (or wrapped functions, inner 
functions, closure, etc.) when writing up decorated task functions?

GitHub link: https://github.com/apache/airflow/discussions/45963

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to