shahar1 commented on issue #55759:
URL: https://github.com/apache/airflow/issues/55759#issuecomment-3313707766

   > > I could be way off here, but I haven't seen folks `return` something 
from a `@task_group` before... My guess is that's the issue?
   > 
   > You definitely can return something from a task group. The issue is that 
you can't map again (expand) over the output of a task group. Maybe that is by 
design ? I thought it was a bug because I expected the task group to be 
equivalent to expanding the functions one after the other - where then I could 
use the output to map again (This is also the workaround I'm using currently).
   > 
   > So this works (it will log the list as one):
   > 
   > log_num(text="Got numbers: ", num=_processed_numbers)
   
   It is indeed by the current design - see this line: 
https://github.com/apache/airflow/blob/a0d5b7121723cb4a2271d1d684f639aa5cc127c1/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1344
   
   You could work around it by implementing an intermediate step that collects 
the results, and then mapping over it:
   ```python
   @dag(
       dag_id="test_dag",
       dag_display_name="test dynamic task group mapping",
       schedule=None,
       start_date=datetime(
           2021, 6, 27, 0, 0, 0, tzinfo=pendulum.timezone("Europe/Berlin")
       ),
       catchup=False,
       description="1.0.0",
   )
   def test_dag():
       @task
       def get_nums():
           return [1, 2, 3, 4, 5, 6, 7, 8]
   
       @task
       def times_2(num):
           return num * 2
   
       @task
       def add_10(num):
           return num + 10
   
       @task
       def log_num(text, nums): # <- now it collects all numbers from previous 
step
           logger.info(f"{text}{nums}")
           return nums # <- pass through for further processing
   
       @task_group
       def process_number(num):
           _times_2 = times_2(num)
           _add_10 = add_10(_times_2)
           return _add_10
   
       _get_nums = get_nums()
       _processed_numbers = process_number.expand(num=_get_nums)
   
       nums = log_num(text="Got number: ", nums=_processed_numbers)
   
       process_number.expand(num=nums)
   
   
   instance = test_dag()
   ```
   
   I'm aware that it is suboptimal in high-scale, but that's the best you could 
do for now :) 
   I'll close the issue for now as `Won't fix`, as it is an expected behavior. 
You're welcome to take part in improving the feature to support it - I'd warn 
though that DTM might be a complicated area for new contributors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to