yimingpeng commented on PR #55130:
URL: https://github.com/apache/airflow/pull/55130#issuecomment-3244886073
Hello @guan404ming,
Thanks for all the great suggestions, I believe I have addressed them, also
made a few more changes, e..g, expanding to other pending tasks
(`isStatePending`) which was checking `running` only. Please help review
again, thanks.
Here is the expanded test dag from the original issue post:
```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.filesystem import FileSensor
from airflow.utils.task_group import TaskGroup
def task_that_fails():
raise Exception("This task always fails for testing")
def task_that_succeeds():
print("This task succeeds")
return "success"
dag = DAG(
'test_gantt_chart_dag',
start_date=datetime(2025, 7, 1),
schedule=None,
is_paused_upon_creation=False,
catchup=False,
max_active_runs=1,
default_args={
'retries': 2,
'retry_delay': timedelta(seconds=10),
}
)
long_running_task = BashOperator(
task_id="long_running_task",
bash_command="echo 'Starting long task'; sleep 180; echo 'Long task
completed'",
dag=dag
)
queued_task = BashOperator(
task_id="queued_task",
bash_command="echo 'Queued task executed'; sleep 30",
dag=dag,
pool='default_pool',
pool_slots=10
)
failing_task = PythonOperator(
task_id="failing_task",
python_callable=task_that_fails,
dag=dag
)
waiting_sensor = FileSensor(
task_id="waiting_sensor",
filepath="/tmp/nonexistent_file_for_testing.txt",
timeout=300,
poke_interval=30,
dag=dag
)
scheduled_task = BashOperator(
task_id="scheduled_task",
bash_command="echo 'Scheduled task executed'; sleep 15",
dag=dag
)
quick_success_task = PythonOperator(
task_id="quick_success_task",
python_callable=task_that_succeeds,
dag=dag
)
upstream_failed_task = BashOperator(
task_id="upstream_failed_task",
bash_command="echo 'This should not run due to upstream failure'",
dag=dag
)
with TaskGroup("processing_group", dag=dag) as processing_group:
group_start = BashOperator(
task_id="group_start",
bash_command="echo 'Group starting'; sleep 5",
dag=dag,
)
parallel_task_1 = BashOperator(
task_id="parallel_task_1",
bash_command="echo 'Parallel task 1 running'; sleep 60",
dag=dag,
)
parallel_task_2 = PythonOperator(
task_id="parallel_task_2",
python_callable=task_that_fails,
dag=dag,
)
parallel_task_3 = BashOperator(
task_id="parallel_task_3",
bash_command="echo 'Parallel task 3 running'; sleep 45",
dag=dag,
)
group_end = BashOperator(
task_id="group_end",
bash_command="echo 'Group completed'; sleep 10",
trigger_rule="none_failed_min_one_success",
dag=dag,
)
group_start >> [parallel_task_1, parallel_task_2, parallel_task_3] >>
group_end
final_task = BashOperator(
task_id="final_task",
bash_command="echo 'All processing completed'",
dag=dag
)
long_running_task >> scheduled_task
quick_success_task >> queued_task
failing_task >> upstream_failed_task
waiting_sensor >> scheduled_task
processing_group >> final_task
```
Here is the new recording after the fix is applied:
https://github.com/user-attachments/assets/f93615d0-8fba-4842-bebc-f5d580c16730
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]