wangyaopw commented on issue #25032:
URL: https://github.com/apache/airflow/issues/25032#issuecomment-1363603332
Hi, @potiuk
I don't think it is possible to expand a taskgroup from the result of a task
yet in 2.5.0.
For example
```
from airflow import DAG
from airflow.decorators import dag, task, task_group
from datetime import datetime
@task
def fetch_files():
return ['file1', 'file2', 'file3']
@task_group
def etl_pipeline(next_file):
@task
def pre_process(file):
return 'PreProcessed '+file
@task
def process(file):
return 'Processed '+file
@task
def post_process(file):
return 'PostProcessed '+file
return post_process(process(pre_process(next_file)))
with DAG(dag_id="task_group_mapping_test",
schedule="@daily",
start_date=datetime(2022, 1, 1),
catchup=False) as dag:
# etl_pipeline.expand(next_file=['file1', 'file2', 'file3']) <-- this
runs fine
# etl_pipeline.expand(next_file=fetch_files())
# <-- this will cause scheduler error
# "airflow.models.expandinput.NotFullyPopulated: Failed to populate all
mapping metadata; missing: 'next_file'
```
Is it possible to expand a taskgroup from task result in release 2.5?
I know expand a task from task result is possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]