Aqib5wani opened a new issue, #41378:
URL: https://github.com/apache/airflow/issues/41378

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.63
   
   ### What happened?
   
   Environment: Ubuntu 20.1 , Python 3.10.14
   
   **Description:**
   
   I am experiencing issues with task group dependencies and dynamic task 
outputs when using the .expand method in Airflow. Below is the DAG code snippet 
that illustrates the problem:
   
   ```
   from airflow.decorators import dag, task_group, task
   from pendulum import datetime
   from airflow.operators.empty import EmptyOperator
   
   from constants.common_constants import TASK_BEGIN
   
   
   @task_group(group_id="Fetch_and_Process_Data", tooltip="This task group is 
very important!")
   def demo_tash_group(my_num):
       @task
       def fetch_data(num):
           print(num)
   
       @task
       def process_data(num):
           print(num)
   
       @task
       def copy_s3_to_ticket_staging(num):
           print(num)
   
       @task
       def copy_s3_to_transaction_staging(num):
           print(num)
    
       @task
       def copy_s3_to_payment_staging(num):
           print(num)
       
       @task
       def copy_s3_into_main_tables(num):
           print(num)
   
       # Setting dependencies
       fetch_data(my_num) >> process_data(my_num) >> 
[copy_s3_to_ticket_staging(my_num), copy_s3_to_transaction_staging(my_num), 
copy_s3_to_payment_staging(my_num)] >> copy_s3_into_main_tables(my_num)
   
   
   @dag(
       start_date=datetime(2022, 12, 1),
       schedule=None,
       catchup=False,
       tags=["task"]
   )
   def task_group_mapping():
       begin = EmptyOperator(task_id=TASK_BEGIN)
   
       @task()
       def get_config_data():
           # It will be a list of dictionaries, fetched at the time of DAG 
execution from my db
           return [19, 23, 42, 8, 7, 108]
       
       task_group = demo_tash_group.expand(my_num=get_config_data())
   
       end = EmptyOperator(task_id="end")
   
       # Setting dependencies
       begin >> task_group >> end
   
   
   task_group_mapping()
   ```
   
   ### Problem Scenarios:
   
   **Scenario 1:**
   
   Issue: When I define get_config_data as a normal function (without the @task 
decorator), my DAG is created correctly with the task chain as defined. 
However, this approach isn't viable since it will execute every time the 
scheduler parses the file, which degrades performance, especially because my 
case involves fetching data from a database.
   this is how the dag looks like
   
![normalfunction](https://github.com/user-attachments/assets/7efee703-d5fd-4889-954c-0b9a5da55dd6)
   
   
   **Scenario 2:**
   
   Issue: To address the performance issue, I added the @task decorator to 
get_config_data. This solved the first problem by ensuring the task only 
executes when the DAG runs. However, it introduced a new issue: the tasks using 
the output of get_config_data are getting attached to it in the DAG chain, 
whereas I want them to be attached only to the first task of the task group.
   
![fucnntionastask](https://github.com/user-attachments/assets/469c2d55-99d3-48a4-88d0-69389aae02f8)
   
   
   **Scenario 3:**
   Attempted Solution: I tried using the output of get_config_data in the first 
task of demo_tash_group and then pushing my_num into XCom, expecting the 
remaining tasks to fetch from XCom. However, this didn’t work because when 
fetching the XCom value, it retrieves a list of all XComs for every dynamic 
task. This makes it impossible to fetch the XCom value for a specific task 
group.
   
   
   
   
   
   
   
   
   ### What you think should happen instead?
   
   Expected Behavior:
   I expect the tasks within the task group to maintain the intended 
dependencies and not be directly attached to get_config_data, while still 
allowing the use of dynamic task outputs.
   
   ### How to reproduce
   
   To reproduce the issue, follow these steps:
   
   Setup Airflow Environment:
   
   Ensure you have a working Airflow environment. The issue has been observed 
in [your Airflow version]. Make sure your setup matches or is compatible with 
the version where the bug was encountered.
   Create a New DAG:
   
   Create a new DAG file in your Airflow DAGs folder using the provided code 
snippet. Ensure you have the necessary imports and helper files 
(constants.common_constants, helper.base_helper) or adjust the code to remove 
those dependencies if they're specific to your environment.
   Scenario 1 - Define get_config_data as a Normal Function:
   
   In the DAG file, define get_config_data as a normal function without the 
@task decorator.
   Load the DAG in the Airflow UI. You should see that the task chain is 
created as expected, with the task group dependencies preserved.
   Note: While this approach works, it's not ideal because the function 
executes every time the scheduler parses the file, leading to potential 
performance degradation.
   Scenario 2 - Use the @task Decorator for get_config_data:
   
   Now, add the @task decorator to get_config_data in the DAG file to ensure it 
only runs when the DAG is triggered.
   Load the DAG in the Airflow UI again.
   Observe that the tasks using the output of get_config_data are now directly 
attached to it in the DAG chain, breaking the intended task group dependencies.
   Scenario 3 - Attempt to Use XCom:
   
   To try and solve the issue from Scenario 2, modify the DAG so that the 
output of get_config_data is used in the first task of demo_tash_group.
   Push the value of my_num into XCom within the first task and attempt to 
retrieve it in the subsequent tasks.
   Observe that when fetching the XCom value, you receive a list of all XComs 
for every dynamic task, rather than the specific XCom value for the intended 
task group.
   Document the Observations:
   
   Take snapshots of the DAG structures in the Airflow UI for both Scenario 1 
and Scenario 2 to illustrate the difference in task dependencies and the issues 
encountered.
   By following these steps, you should be able to reproduce the bug and 
observe the issues with task group dependencies and dynamic task outputs when 
using the .expand method in Airflow.
   
   ### Operating System
   
   Ubuntu
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to