wangyaopw commented on issue #25032:
URL: https://github.com/apache/airflow/issues/25032#issuecomment-1363603332

   Hi, @potiuk 
   
   I don't think it is possible to expand a taskgroup from the result of a task 
yet in 2.5.0.
   
   For example
   
   ```
   from airflow import DAG
   from airflow.decorators import dag, task, task_group
   from datetime import datetime
   
   @task
   def fetch_files():
       return ['file1', 'file2', 'file3']
   
   @task_group
   def etl_pipeline(next_file):
       @task
       def pre_process(file):
           return 'PreProcessed '+file
   
       @task
       def process(file):
           return 'Processed '+file
   
       @task
       def post_process(file):
           return 'PostProcessed '+file
   
       return post_process(process(pre_process(next_file)))
   
   
   with DAG(dag_id="task_group_mapping_test",
            schedule="@daily",
            start_date=datetime(2022, 1, 1),
            catchup=False) as dag:
       # etl_pipeline.expand(next_file=['file1', 'file2', 'file3'])    <-- this 
runs fine
   
       # etl_pipeline.expand(next_file=fetch_files())     
       #  <-- this will cause scheduler error 
       # "airflow.models.expandinput.NotFullyPopulated: Failed to populate all 
mapping metadata; missing: 'next_file'
   ```
   
   Is it possible to expand a taskgroup from task result in release 2.5?
   
   I know expand a task from task result is possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to