casassg commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r441013469
##########
File path: docs/concepts.rst
##########
@@ -116,6 +116,47 @@ DAGs can be used as context managers to automatically
assign new operators to th
op.dag is dag # True
+.. _concepts:functional_dags:
+
+Functional DAGs
+---------------
+*Added in Airflow 1.10.11*
+
+DAGs can be defined using functional abstractions. Outputs and inputs are sent
between tasks using
+:ref:`XComs <concepts:xcom>` values. In addition, you can wrap functions as
tasks using the
+:ref:`task decorator <concepts:task_decorator>`. Dependencies are
automatically inferred from
+the message dependencies.
+
+Example DAG with functional abstraction
+
+.. code:: python
+
+ with DAG(
+ 'send_server_ip', default_args=default_args, schedule_interval=None
+ ) as dag:
+
+ # Using default connection as it's set to httpbin.org by default
+ get_ip = SimpleHttpOperator(
+ task_id='get_ip', endpoint='get', method='GET', xcom_push=True
+ )
+
+ @dag.task(multiple_outputs=True)
+ def prepare_email(raw_json: str) -> str:
+ external_ip = json.loads(raw_json)['origin']
+ return {
+ 'subject':f'Server connected from {external_ip}',
+ 'body': f'Seems like today your server executing Airflow is connected
from the external IP {external_ip}<br>'
+ }
+
+ email_info = prepare_email(get_ip.output)
Review comment:
The `output` part was implemented already as part of a previous PR. This
PR focuses on `@task` only.
The main difference between the proposed approach vs current approach is
that in the current approach we do not need to `call` normal operators. This
should reduce complexity on creating functional DAGs.
In addition, existing operators may have templated fields that are mandatory
(example `subject` is required in `EmailOperator` initialization). This
basically means that we need to overwrite them on `call` which is a bit
confusing (see example below or example on AIP)
The `output` is basically a way to access XComArg for non callable
operators. This allows a more smooth definition of functional DAGs between
normal operators and `@task` operators.
Old example:
```python
get_ip = SimpleHttpOperator(
task_id='get_ip', endpoint='get', method='GET', xcom_push=True
)
@dag.task(multiple_outputs=True)
def prepare_email(raw_json: str) -> Dict[str, str]:
external_ip = json.loads(raw_json)['origin']
return {
'subject':f'Server connected from {external_ip}',
'body': f'Seems like today your server executing Airflow is
connected from the external IP {external_ip}<br>'
}
server_info = get_ip()
email_info = prepare_email(server_info)
send_email = EmailOperator(
task_id='send_email',
to='[email protected]',
subject='',
html_content=''
)
send_email(subject=email_info['subject'],
html_content=email_info['body'])
```
New approach:
```python
get_ip = SimpleHttpOperator(
task_id='get_ip', endpoint='get', method='GET', xcom_push=True
)
@dag.task(multiple_outputs=True)
def prepare_email(raw_json: str) -> Dict[str, str]:
external_ip = json.loads(raw_json)['origin']
return {
'subject':f'Server connected from {external_ip}',
'body': f'Seems like today your server executing Airflow is
connected from the external IP {external_ip}<br>'
}
email_info = prepare_email(get_ip.output)
send_email = EmailOperator(
task_id='send_email',
to='[email protected]',
subject=email_info['subject'],
html_content=email_info['body']
)
```
I do believe the new approach is better. I'm happy to change the AIP and
submit it for vote if that seems something that may be required.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]