GitHub user pykenny edited a comment on the discussion: Using nested
(decorated) function in decorated tasks (plus integration with TaskFlow)
Still tackling with Breeze installation/configuration, however I think being
able to combine other decorators/wrappers with existing Airflow task decorators
can be very useful when one wants to access multiple external services inside
one task instead of having need to explicitly calling for hooks inside of the
function, writing customized task decorator to combine all the things together
(through provider plugin), or adding extra workarounds to break the task into
several sequential steps (with corresponding operators).
```python
@task
@hook.mysql(conn_id="my_connection_sql", arg_name: "mysql_hook")
@hook.postgres(conn_id="my_connection_pg", arg_name: "pg_hook")
@hook.amazon.s3(conn_id="my_connection_s3", arg_name: "s3_hook")
def my_task(
upstream_data_01: dict,
upstream_data_02: str,
mysql_hook = None,
pg_hook = None,
s3_hook = None
):
...
```
Which is conceptually equivalent to:
```python
@task
def my_task(
upstream_data_01: dict,
upstream_data_02: str,
):
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
mysql_hook = MySqlHook("my_connection_sql")
pg_hook = PostgresHook("my_connection_pg")
s3_hook = S3Hook("my_connection_s3")
...
```
With that possible, official decorators, third-party provider decorators, and
user-defined decorators can be used at the same time:
```python
@task.customized(task_id="my_task") # Say `@task.customized` is registered by
some third-party provider
@hook.mysql(conn_id="my_connection_sql", arg_name: "mysql_hook")
@hook.postgres(conn_id="my_connection_pg", arg_name: "pg_hook")
@hook.amazon.s3(conn_id="my_connection_s3", arg_name: "s3_hook")
@default_logger
def my_task(
upstream_data_01: dict,
upstream_data_02: str,
mysql_hook = None,
pg_hook = None,
s3_hook = None,
logger = None,
):
...
```
Or even rewriting operator classes into decorator version, with decorators:
```python
# "dynamic" resolves connection arguments from decorator arguments
@hook.amazon.s3.dynamic(arg_name: "s3_hook")
@default_logger # Similar to `BaseOperator.log`
def create_bucket(f, s3_hook = None, logger = None, *args, **kwargs) -> dict:
def callable(*args, **kwargs)
dynamic_configuration = f()
arguments: dict = resolve_arguments(args, kwargs, dynamic_configuration)
# bucket_name: str, region_name: str | None
resolved_bucket_name = arguments["bucket_name"]
if not s3_hook.check_for_bucket(resolved_bucket_name):
s3_hook.create_bucket(**arguments)
logger.info("Created bucket with name: %s", resolved_bucket_name)
else:
logger.info("Bucket with name: %s already exists",
resolved_bucket_name)
return arguments
return callable
@hook.amazon.s3.dynamic(arg_name: "s3_hook")
@default_logger # Similar to `BaseOperator.log`
def delete_bucket(callable, s3_hook = None, logger = None, *args, **kwargs) ->
bool:
def callable(*args, **kwargs)
dynamic_configuration = callable()
arguments: dict = resolve_arguments(args, kwargs, dynamic_configuration)
# bucket_name: str, force_delete: bool
resolved_bucket_name = hook_args["bucket_name"]
exist_flag = s3_hook.check_for_bucket(resolved_bucket_name)
if s3_hook.check_for_bucket(resolved_bucket_name):
s3_hook.delete_bucket(**resolved_bucket_name)
logger.info("Deleted bucket with name: %s", resolved_bucket_name)
else:
logger..info("Bucket with name: %s doesn't exist",
resolved_bucket_name)
return exist_flag
return callable
@task(task_id="create_my_bucket")
@create_bucket(aws_conn_id="my_aws_conn")
def create_my_bucket():
from my_package import settings
region_name = my_package_settings.AWS.DEFAULT_REGION
= my_package_settings.AWS_CONN_ID
return {"bucket_name": "my_bucket", "region_name": region_name}
@task(task_id="delete_my_bucket")
@delete_bucket(aws_conn_id="my_aws_conn")
def delete_my_bucket(): pass
create_task = create_my_bucket()
delete_task = delete_my_bucket(bucket_name=create_task["bucket_name"],
force_delete=True)
```
GitHub link:
https://github.com/apache/airflow/discussions/45963#discussioncomment-11938206
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]