GitHub user pykenny edited a comment on the discussion: Using nested 
(decorated) function in decorated tasks (plus integration with TaskFlow)

Still tackling with Breeze installation/configuration, however I think being 
able to combine other decorators/wrappers with existing Airflow task decorators 
can be very useful when one wants to access multiple external services inside 
one task instead of having need to explicitly calling for hooks inside of the 
function, writing customized task decorator to combine all the things together 
(through provider plugin), or adding extra workarounds to break the task into 
several sequential steps (with corresponding operators).

```python
@task
@hook.mysql(conn_id="my_connection_sql", arg_name: "mysql_hook")
@hook.postgres(conn_id="my_connection_pg", arg_name: "pg_hook")
@hook.amazon.s3(conn_id="my_connection_s3", arg_name: "s3_hook")
def my_task(
    upstream_data_01: dict,
    upstream_data_02: str,
    mysql_hook = None,
    pg_hook = None,
    s3_hook = None
):
    ...
```

Which is conceptually equivalent to:

```python
@task
def my_task(
    upstream_data_01: dict,
    upstream_data_02: str,
):
    from airflow.providers.mysql.hooks.mysql import MySqlHook
    from airflow.providers.postgres.hooks.postgres import PostgresHook
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook

    mysql_hook = MySqlHook("my_connection_sql")
    pg_hook = PostgresHook("my_connection_pg")
    s3_hook = S3Hook("my_connection_s3")
    ...
```

With that possible, official decorators, third-party provider decorators, and 
user-defined decorators can be used at the same time:

```python
@task.customized(task_id="my_task")  # Say `@task.customized` is registered by 
some third-party provider
@hook.mysql(conn_id="my_connection_sql", arg_name: "mysql_hook")
@hook.postgres(conn_id="my_connection_pg", arg_name: "pg_hook")
@hook.amazon.s3(conn_id="my_connection_s3", arg_name: "s3_hook")
@default_logger
def my_task(
    upstream_data_01: dict,
    upstream_data_02: str,
    mysql_hook = None,
    pg_hook = None,
    s3_hook = None,
    logger = None,
):
    ...
```

Or even rewriting operator classes into decorator version, with decorators:

```python
# "dynamic" resolves connection arguments from decorator arguments
@hook.amazon.s3.dynamic(arg_name: "s3_hook")
@default_logger  # Similar to `BaseOperator.log`
def create_bucket(f, s3_hook = None, logger = None, *args, **kwargs) -> dict:
    def callable(*args, **kwargs)
        dynamic_configuration =  f()
        arguments: dict = resolve_arguments(args, kwargs, dynamic_configuration)
        # bucket_name: str, region_name: str | None
    
        resolved_bucket_name = arguments["bucket_name"]
        if not s3_hook.check_for_bucket(resolved_bucket_name):
            s3_hook.create_bucket(**arguments)
            logger.info("Created bucket with name: %s", resolved_bucket_name)
        else:
            logger.info("Bucket with name: %s already exists", 
resolved_bucket_name)
        
        return arguments

   return callable


@hook.amazon.s3.dynamic(arg_name: "s3_hook")
@default_logger  # Similar to `BaseOperator.log`
def delete_bucket(f, s3_hook = None, logger = None, *args, **kwargs) -> bool:
    def callable(*args, **kwargs)
        dynamic_configuration =  f()
        arguments: dict = resolve_arguments(args, kwargs, dynamic_configuration)
        # bucket_name: str, force_delete: bool
        
        resolved_bucket_name = hook_args["bucket_name"]
        exist_flag = s3_hook.check_for_bucket(resolved_bucket_name)
    
        if s3_hook.check_for_bucket(resolved_bucket_name):
            s3_hook.delete_bucket(**resolved_bucket_name)
            logger.info("Deleted bucket with name: %s", resolved_bucket_name)
        else:
            logger..info("Bucket with name: %s doesn't exist", 
resolved_bucket_name)

        return exist_flag
    
    return callable


@task(task_id="create_my_bucket")
@create_bucket(aws_conn_id="my_aws_conn")
def create_my_bucket():
    from my_package import settings
    
    region_name = my_package_settings.AWS.DEFAULT_REGION
    = my_package_settings.AWS_CONN_ID

    return {"bucket_name": "my_bucket", "region_name": region_name}


@task(task_id="delete_my_bucket")
@delete_bucket(aws_conn_id="my_aws_conn")
def delete_my_bucket(): pass


create_task = create_my_bucket()
delete_task = delete_my_bucket(bucket_name=create_task["bucket_name"], 
force_delete=True)

```

GitHub link: 
https://github.com/apache/airflow/discussions/45963#discussioncomment-11938206

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to