yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-688778222
> I love this change! This DAG representation is nice and clean. I'm just
wondering if this will work with AIP-31 approach (`@task` decorator). Especially
>
> ```python
> @task
> def my_task():
> ...
>
> with TaskGroup(...) as tg:
> ...
>
> r = my_task()
> r >> tg
> ```
Hi, @turbaszek thanks for the detailed review. I have updated the PR
accordingly.
Regarding the question about AIP-31, I just verified it works like a charm.
The indentation of your call to `my_task()` needs to be updated to be inside
the TaskGroup contextmanager. I.e. something like this:
```python
@task
def my_task():
...
with TaskGroup(...) as tg:
...
r = my_task()
r >> tg
```
Looking at the implementation of the `@task` decorator, it's no surprise
this works fine because it's calling this line under the hood. As long as this
is called within the context of a TaskGroup, it works fine. i think passing
`task_group` as an argument to the decorator would also work although the code
would not look as elegant.
```
...
op = _PythonFunctionalOperator(python_callable=f, op_args=args,
op_kwargs=f_kwargs,
multiple_outputs=multiple_outputs, **kwargs)
```
Here's a fully working example slightly modified from `example_task_group`.
Notice how the `hello_world` task appeared in two groups. The underlying
`task_id` are different as shown in the Tree View.
```python
from airflow.models.dag import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import task, PythonOperator
@task
def hello_world(value):
print(value)
# [START howto_task_group]
with DAG(dag_id="example_task_group2", start_date=days_ago(2)) as dag:
start = DummyOperator(task_id="start")
# [START howto_task_group_section_1]
with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1:
task_1 = DummyOperator(task_id="task_1")
task_2 = DummyOperator(task_id="task_2")
task_3 = PythonOperator(task_id="task_3", python_callable=lambda:
"task_3 output")
hello_world(task_3.output)
task_1 >> [task_2, task_3]
# [END howto_task_group_section_1]
# [START howto_task_group_section_2]
with TaskGroup("section_2", tooltip="Tasks for section_2") as section_2:
task_1 = DummyOperator(task_id="task_1")
# [START howto_task_group_inner_section_2]
with TaskGroup("inner_section_2", tooltip="Tasks for
inner_section2") as inner_section_2:
task_2 = DummyOperator(task_id="task_2")
task_3 = PythonOperator(task_id="task_3",
python_callable=lambda: "task_3 output")
task_hello = hello_world(task_3.output)
# [END howto_task_group_inner_section_2]
# [END howto_task_group_section_2]
end = DummyOperator(task_id='end')
start >> section_1 >> section_2 >> end
# [END howto_task_group]
```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]