kn3609571 commented on issue #41378:
URL: https://github.com/apache/airflow/issues/41378#issuecomment-2855271799

   I think this is expected behavior, since the results of `get_config_data` 
are used as input for each task in the task group.
   You can use the results of each task as input of downstream tasks to reduce 
arrows in the graph.
   (Please note that the return values and relationships should be adapted to 
your needs.)
   
   Example:
   ```python
   from airflow.decorators import dag, task_group, task
   from pendulum import datetime
   from airflow.operators.empty import EmptyOperator
   
   from constants.common_constants import TASK_BEGIN
   
   
   @task_group(group_id="Fetch_and_Process_Data", tooltip="This task group is 
very important!")
   def demo_tash_group(my_num):
       @task
       def fetch_data(num):
           print(num)
           return {"num": num, "fetched": {}}
   
       @task
       def process_data(fetch_res):
           print(fetch_res["num"])
           return {**fetch_res, "processed": {}}
   
       @task
       def copy_s3_to_ticket_staging(process_res):
           print(process_res["num"])
           return {**process_res, "copied": {}}
   
       @task
       def copy_s3_to_transaction_staging(process_res):
           print(process_res["num"])
           return {**process_res, "copied": {}}
    
       @task
       def copy_s3_to_payment_staging(process_res):
           print(process_res["num"])
           return {**process_res, "copied": {}}
       
       @task
       def copy_s3_into_main_tables(copy_ticket_res, copy_transaction_res, 
copy_payment_res):
           print(copy_ticket_res)
           print(copy_transaction_res)
           print(copy_payment_res)
   
       # Setting dependencies
       process_task = process_data(fetch_data(my_num))
       copy_ticket_task = copy_s3_to_ticket_staging(process_task)
       copy_transaction_res = copy_s3_to_transaction_staging(process_task)
       copy_payment_task = copy_s3_to_payment_staging(process_task)
       copy_s3_into_main_tables(copy_ticket_task, copy_transaction_task, 
copy_payment_task)
   
   
   @dag(
       start_date=datetime(2022, 12, 1),
       schedule=None,
       catchup=False,
       tags=["task"]
   )
   def task_group_mapping():
       begin = EmptyOperator(task_id=TASK_BEGIN)
   
       @task()
       def get_config_data():
           # It will be a list of dictionaries, fetched at the time of DAG 
execution from my db
           return [19, 23, 42, 8, 7, 108]
       
       task_group = demo_tash_group.expand(my_num=get_config_data())
   
       end = EmptyOperator(task_id="end")
   
       # Setting dependencies
       begin >> task_group >> end
   
   
   task_group_mapping()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to