kn3609571 commented on issue #41378:
URL: https://github.com/apache/airflow/issues/41378#issuecomment-2855271799
I think this is expected behavior, since the results of `get_config_data`
are used as input for each task in the task group.
You can use the results of each task as input of downstream tasks to reduce
arrows in the graph.
(Please note that the return values and relationships should be adapted to
your needs.)
Example:
```python
from airflow.decorators import dag, task_group, task
from pendulum import datetime
from airflow.operators.empty import EmptyOperator
from constants.common_constants import TASK_BEGIN
@task_group(group_id="Fetch_and_Process_Data", tooltip="This task group is
very important!")
def demo_tash_group(my_num):
@task
def fetch_data(num):
print(num)
return {"num": num, "fetched": {}}
@task
def process_data(fetch_res):
print(fetch_res["num"])
return {**fetch_res, "processed": {}}
@task
def copy_s3_to_ticket_staging(process_res):
print(process_res["num"])
return {**process_res, "copied": {}}
@task
def copy_s3_to_transaction_staging(process_res):
print(process_res["num"])
return {**process_res, "copied": {}}
@task
def copy_s3_to_payment_staging(process_res):
print(process_res["num"])
return {**process_res, "copied": {}}
@task
def copy_s3_into_main_tables(copy_ticket_res, copy_transaction_res,
copy_payment_res):
print(copy_ticket_res)
print(copy_transaction_res)
print(copy_payment_res)
# Setting dependencies
process_task = process_data(fetch_data(my_num))
copy_ticket_task = copy_s3_to_ticket_staging(process_task)
copy_transaction_res = copy_s3_to_transaction_staging(process_task)
copy_payment_task = copy_s3_to_payment_staging(process_task)
copy_s3_into_main_tables(copy_ticket_task, copy_transaction_task,
copy_payment_task)
@dag(
start_date=datetime(2022, 12, 1),
schedule=None,
catchup=False,
tags=["task"]
)
def task_group_mapping():
begin = EmptyOperator(task_id=TASK_BEGIN)
@task()
def get_config_data():
# It will be a list of dictionaries, fetched at the time of DAG
execution from my db
return [19, 23, 42, 8, 7, 108]
task_group = demo_tash_group.expand(my_num=get_config_data())
end = EmptyOperator(task_id="end")
# Setting dependencies
begin >> task_group >> end
task_group_mapping()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]