Aqib5wani opened a new issue, #41378:
URL: https://github.com/apache/airflow/issues/41378
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.63
### What happened?
Environment: Ubuntu 20.1 , Python 3.10.14
**Description:**
I am experiencing issues with task group dependencies and dynamic task
outputs when using the .expand method in Airflow. Below is the DAG code snippet
that illustrates the problem:
```
from airflow.decorators import dag, task_group, task
from pendulum import datetime
from airflow.operators.empty import EmptyOperator
from constants.common_constants import TASK_BEGIN
@task_group(group_id="Fetch_and_Process_Data", tooltip="This task group is
very important!")
def demo_tash_group(my_num):
@task
def fetch_data(num):
print(num)
@task
def process_data(num):
print(num)
@task
def copy_s3_to_ticket_staging(num):
print(num)
@task
def copy_s3_to_transaction_staging(num):
print(num)
@task
def copy_s3_to_payment_staging(num):
print(num)
@task
def copy_s3_into_main_tables(num):
print(num)
# Setting dependencies
fetch_data(my_num) >> process_data(my_num) >>
[copy_s3_to_ticket_staging(my_num), copy_s3_to_transaction_staging(my_num),
copy_s3_to_payment_staging(my_num)] >> copy_s3_into_main_tables(my_num)
@dag(
start_date=datetime(2022, 12, 1),
schedule=None,
catchup=False,
tags=["task"]
)
def task_group_mapping():
begin = EmptyOperator(task_id=TASK_BEGIN)
@task()
def get_config_data():
# It will be a list of dictionaries, fetched at the time of DAG
execution from my db
return [19, 23, 42, 8, 7, 108]
task_group = demo_tash_group.expand(my_num=get_config_data())
end = EmptyOperator(task_id="end")
# Setting dependencies
begin >> task_group >> end
task_group_mapping()
```
### Problem Scenarios:
**Scenario 1:**
Issue: When I define get_config_data as a normal function (without the @task
decorator), my DAG is created correctly with the task chain as defined.
However, this approach isn't viable since it will execute every time the
scheduler parses the file, which degrades performance, especially because my
case involves fetching data from a database.
this is how the dag looks like

**Scenario 2:**
Issue: To address the performance issue, I added the @task decorator to
get_config_data. This solved the first problem by ensuring the task only
executes when the DAG runs. However, it introduced a new issue: the tasks using
the output of get_config_data are getting attached to it in the DAG chain,
whereas I want them to be attached only to the first task of the task group.

**Scenario 3:**
Attempted Solution: I tried using the output of get_config_data in the first
task of demo_tash_group and then pushing my_num into XCom, expecting the
remaining tasks to fetch from XCom. However, this didn’t work because when
fetching the XCom value, it retrieves a list of all XComs for every dynamic
task. This makes it impossible to fetch the XCom value for a specific task
group.
### What you think should happen instead?
Expected Behavior:
I expect the tasks within the task group to maintain the intended
dependencies and not be directly attached to get_config_data, while still
allowing the use of dynamic task outputs.
### How to reproduce
To reproduce the issue, follow these steps:
Setup Airflow Environment:
Ensure you have a working Airflow environment. The issue has been observed
in [your Airflow version]. Make sure your setup matches or is compatible with
the version where the bug was encountered.
Create a New DAG:
Create a new DAG file in your Airflow DAGs folder using the provided code
snippet. Ensure you have the necessary imports and helper files
(constants.common_constants, helper.base_helper) or adjust the code to remove
those dependencies if they're specific to your environment.
Scenario 1 - Define get_config_data as a Normal Function:
In the DAG file, define get_config_data as a normal function without the
@task decorator.
Load the DAG in the Airflow UI. You should see that the task chain is
created as expected, with the task group dependencies preserved.
Note: While this approach works, it's not ideal because the function
executes every time the scheduler parses the file, leading to potential
performance degradation.
Scenario 2 - Use the @task Decorator for get_config_data:
Now, add the @task decorator to get_config_data in the DAG file to ensure it
only runs when the DAG is triggered.
Load the DAG in the Airflow UI again.
Observe that the tasks using the output of get_config_data are now directly
attached to it in the DAG chain, breaking the intended task group dependencies.
Scenario 3 - Attempt to Use XCom:
To try and solve the issue from Scenario 2, modify the DAG so that the
output of get_config_data is used in the first task of demo_tash_group.
Push the value of my_num into XCom within the first task and attempt to
retrieve it in the subsequent tasks.
Observe that when fetching the XCom value, you receive a list of all XComs
for every dynamic task, rather than the specific XCom value for the intended
task group.
Document the Observations:
Take snapshots of the DAG structures in the Airflow UI for both Scenario 1
and Scenario 2 to illustrate the difference in task dependencies and the issues
encountered.
By following these steps, you should be able to reproduce the bug and
observe the issues with task group dependencies and dynamic task outputs when
using the .expand method in Airflow.
### Operating System
Ubuntu
### Versions of Apache Airflow Providers
_No response_
### Deployment
Amazon (AWS) MWAA
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]