bbovenzi commented on PR #64271:
URL: https://github.com/apache/airflow/pull/64271#issuecomment-4201934777
Finally, since I am talking about traversing the nested NodeResponse[] many
times. I would like to make sure we test this against a dag with 1000+ tasks
with a bunch of groups too.
Here's an example:
```
"""Large DAG with 1000 tasks for testing grid and graph views."""
from datetime import datetime
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
with DAG(
dag_id="large_dag_1000_tasks",
start_date=datetime(2025, 1, 1),
catchup=False,
schedule="@daily",
tags=["testing", "large", "performance"],
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
# Group 1: Data Ingestion (200 tasks)
with TaskGroup("data_ingestion", tooltip="Data ingestion tasks") as
ingestion_group:
ingestion_tasks = []
for i in range(200):
task = EmptyOperator(task_id=f"ingest_{i:03d}")
ingestion_tasks.append(task)
# Chain tasks in batches of 10
for i in range(0, len(ingestion_tasks) - 10, 10):
ingestion_tasks[i] >> ingestion_tasks[i + 10]
# Group 2: Data Processing (300 tasks with nested groups)
with TaskGroup("data_processing", tooltip="Data processing tasks") as
processing_group:
# Subgroup: Validation (100 tasks)
with TaskGroup("validation", tooltip="Validation tasks") as
validation_group:
validation_tasks = []
for i in range(100):
task = EmptyOperator(task_id=f"validate_{i:03d}")
validation_tasks.append(task)
# Fan out pattern
validation_tasks[0] >> validation_tasks[1:50]
for t in validation_tasks[1:50]:
t >> validation_tasks[50]
# Subgroup: Transformation (100 tasks)
with TaskGroup("transformation", tooltip="Transformation tasks") as
transform_group:
transform_tasks = []
for i in range(100):
task = BashOperator(
task_id=f"transform_{i:03d}",
bash_command=f"echo 'Transforming batch {i}'"
)
transform_tasks.append(task)
# Linear chain
for i in range(len(transform_tasks) - 1):
transform_tasks[i] >> transform_tasks[i + 1]
# Subgroup: Enrichment (100 tasks)
with TaskGroup("enrichment", tooltip="Enrichment tasks") as
enrichment_group:
enrichment_tasks = []
for i in range(100):
task = EmptyOperator(task_id=f"enrich_{i:03d}")
enrichment_tasks.append(task)
# Parallel execution
pass # All run in parallel
validation_group >> transform_group >> enrichment_group
# Group 3: Analytics (250 tasks with deeply nested groups)
with TaskGroup("analytics", tooltip="Analytics tasks") as
analytics_group:
with TaskGroup("metrics", tooltip="Metrics calculation") as
metrics_group:
with TaskGroup("daily_metrics", tooltip="Daily metrics") as
daily_metrics:
daily_tasks =
[EmptyOperator(task_id=f"daily_metric_{i:03d}") for i in range(50)]
daily_tasks[0] >> daily_tasks[1:]
with TaskGroup("weekly_metrics", tooltip="Weekly metrics") as
weekly_metrics:
weekly_tasks =
[EmptyOperator(task_id=f"weekly_metric_{i:03d}") for i in range(50)]
weekly_tasks[0] >> weekly_tasks[1:]
daily_metrics >> weekly_metrics
with TaskGroup("reports", tooltip="Report generation") as
reports_group:
report_tasks = []
for i in range(150):
task = EmptyOperator(task_id=f"report_{i:03d}")
report_tasks.append(task)
# Diamond pattern
report_tasks[0] >> report_tasks[1:75]
for t in report_tasks[1:75]:
t >> report_tasks[75:150]
metrics_group >> reports_group
# Group 4: Export (200 tasks)
with TaskGroup("export", tooltip="Data export tasks") as export_group:
export_tasks = []
for i in range(200):
task = EmptyOperator(task_id=f"export_{i:03d}")
export_tasks.append(task)
# Fan-in pattern
export_tasks[0:100] >> export_tasks[100]
export_tasks[100] >> export_tasks[101:200]
# Group 5: Cleanup (50 tasks)
with TaskGroup("cleanup", tooltip="Cleanup tasks") as cleanup_group:
cleanup_tasks = [EmptyOperator(task_id=f"cleanup_{i:03d}") for i in
range(50)]
# All parallel
# Wire up the main flow
start >> ingestion_group >> processing_group >> analytics_group >>
export_group >> cleanup_group >> end
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]