rickscarta opened a new issue, #33216:
URL: https://github.com/apache/airflow/issues/33216

   ### Apache Airflow version
   
   2.6.3
   
   ### What happened
   
   This is a bit of an odd one. A dag with less than 5 tasks and an 
`on_failure_callback` set will will execute the failure callback twice if you 
manually mark all the tasks in the dag run as failed in the UI. If there are 5 
tasks, the failure callback will be executed 3 times. 
   
   
   ### What you think should happen instead
   
   Callback should be executed once.
   
   ### How to reproduce
   
   Followed instructions 
[here](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html)
 to get docker compose yaml file and start airflow.
   
   Added the following dag (TESTDAG.py) to the dags folder:
   
   ```python
   import datetime as dt
   import time
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   
   
   def will_fail():
       time.sleep(300)
       raise Exception()
   
   def fail_cb(context):
       print("===================================")
       print("             FAILED")
       print("===================================")
   
   
   dag = DAG(
       dag_id='TESTING',
       start_date=dt.datetime(2023,8,1),
       schedule_interval=None,
       on_failure_callback=fail_cb
   )
   
   op1 = PythonOperator(
       task_id="willfail1",
       dag=dag,
       python_callable=will_fail
   )
   ```
   
   Turn the dag on in the UI and start a run
   
   In a terminal, connect to the scheduler container and tail the scheduler 
logs for that dag
   
   `tail -f  logs/scheduler/latest/TESTDAG.py.log`
   
   In the UI, select the task willfail1 and mark it as failed.
   
   You will see in the scheduler logs that the callback `fail_cb` was called 
twice.
   
   
   Add 4 more operators to the dag so your dag file now looks like:
   
   ```python
   import datetime as dt
   import time
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   
   
   def will_fail():
       time.sleep(300)
       raise Exception()
   
   def fail_cb(context):
       print("===================================")
       print("             FAILED")
       print("===================================")
   
   
   dag = DAG(
       dag_id='TESTING',
       start_date=dt.datetime(2023,8,1),
       schedule_interval=None,
       on_failure_callback=fail_cb
   )
   
   op1 = PythonOperator(
       task_id="willfail1",
       dag=dag,
       python_callable=will_fail
   )
   
   op2 = PythonOperator(
       task_id="willfail2",
       dag=dag,
       python_callable=will_fail
   )
   
   op3 = PythonOperator(
       task_id="willfail3",
       dag=dag,
       python_callable=will_fail
   )
   
   op4 = PythonOperator(
       task_id="willfail4",
       dag=dag,
       python_callable=will_fail
   )
   
   op5 = PythonOperator(
       task_id="willfail5",
       dag=dag,
       python_callable=will_fail
   )
   ```
   
   Refersh the UI and run the dag again.
   
   Manually mark each task as failed.
   
   You will see in the scheduler log that the callback was called three times.
   
   
   Note, this does not happen when you mark the **dag run** as failed (in that 
scenario, the callback executes once) - only when you mark all the tasks as 
failed.
   
   
   ### Operating System
   
   Mac OS Ventura 13.3.1
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow==2.6.3
   apache-airflow-providers-amazon==8.1.0
   apache-airflow-providers-celery==3.2.0
   apache-airflow-providers-cncf-kubernetes==7.0.0
   apache-airflow-providers-common-sql==1.5.1
   apache-airflow-providers-docker==3.7.0
   apache-airflow-providers-elasticsearch==4.5.0
   apache-airflow-providers-ftp==3.4.1
   apache-airflow-providers-google==10.1.1
   apache-airflow-providers-grpc==3.2.0
   apache-airflow-providers-hashicorp==3.4.0
   apache-airflow-providers-http==4.4.1
   apache-airflow-providers-imap==3.2.1
   apache-airflow-providers-microsoft-azure==6.1.1
   apache-airflow-providers-mysql==5.1.0
   apache-airflow-providers-odbc==3.3.0
   apache-airflow-providers-postgres==5.5.0
   apache-airflow-providers-redis==3.2.0
   apache-airflow-providers-sendgrid==3.2.0
   apache-airflow-providers-sftp==4.3.0
   apache-airflow-providers-slack==7.3.0
   apache-airflow-providers-snowflake==4.1.0
   apache-airflow-providers-sqlite==3.4.1
   apache-airflow-providers-ssh==3.7.0
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Followed instructions for docker compose from 
[here](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html).
   
   Output of `docker version`:
   
   ```
   Client:
    Cloud integration: v1.0.33
    Version:           24.0.2
    API version:       1.43
    Go version:        go1.20.4
    Git commit:        cb74dfc
    Built:             Thu May 25 21:51:16 2023
    OS/Arch:           darwin/arm64
    Context:           desktop-linux
   
   Server: Docker Desktop 4.20.1 (110738)
    Engine:
     Version:          24.0.2
     API version:      1.43 (minimum version 1.12)
     Go version:       go1.20.4
     Git commit:       659604f
     Built:            Thu May 25 21:50:59 2023
     OS/Arch:          linux/arm64
     Experimental:     false
    containerd:
     Version:          1.6.21
     GitCommit:        3dce8eb055cbb6872793272b4f20ed16117344f8
    runc:
     Version:          1.1.7
     GitCommit:        v1.1.7-0-g860f061
    docker-init:
     Version:          0.19.0
     GitCommit:        de40ad0
   ```
   
   
   
   
   ### Anything else
   
   It was consistently reproducible for me given the above setup.
   
   I am willing to contribute as best I can, but I am not familiar with 
airflow's internals and got stuck while trying to debug.
   
   I confirmed that [this 
function](https://github.com/apache/airflow/blob/569e32b26fd7541e3a4182ce57a7c02f03d11155/airflow/models/dag.py#L1377)
 is being called multiple times. That function is referenced in two files as 
far as I could find:
   
    - 
[dagrun.py](https://github.com/apache/airflow/blob/569e32b26fd7541e3a4182ce57a7c02f03d11155/airflow/models/dagrun.py#L643)
    - 
[processor.py](https://github.com/apache/airflow/blob/569e32b26fd7541e3a4182ce57a7c02f03d11155/airflow/dag_processing/processor.py#L748)
   
   But I could not work out from there where those came from and what they 
might correspond to in the UI.
   
   I tried looking the other way and trying to find what was called when I 
manually set the task to failed, but I am not familiar enough with [React to 
follow 
this](https://github.com/apache/airflow/blob/569e32b26fd7541e3a4182ce57a7c02f03d11155/airflow/www/static/js/dag/details/taskInstance/taskActions/MarkInstanceAs.tsx#L45).
   
   
   
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to