Bowrna opened a new issue, #27614: URL: https://github.com/apache/airflow/issues/27614
### Apache Airflow version 2.4.2 ### What happened I have applied task callback for failure and success case using Cluster policy for the specific dag [https://gist.github.com/Bowrna/7fdca8b546fc274edd068ffdae5b76f9](https://gist.github.com/Bowrna/7fdca8b546fc274edd068ffdae5b76f9) I am attaching the cluster policy that I have applied here. https://gist.github.com/Bowrna/1994894beea39fa8e1c269591b7f0346 On executing the DAG, the success callback is correctly invoked once for every successful task, while failure callback is invoked twice for a failure in task ### What you think should happen instead Like success callback, failure callback also should get executed only once. ### How to reproduce I have attached the sample DAG and airflow_local_settings.py file in which i have added the cluster policy used. On running airflow with that and executing the DAG either manually/scheduled will cause to log the below details ``` {"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062485, "task_name": "runme_0", "task_duration": 1.178738, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"} {"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062486, "task_name": "runme_1", "task_duration": 1.081635, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"} {"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062488, "task_name": "runme_2", "task_duration": 1.071424, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"} {"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062495, "task_name": "also_run_this", "task_duration": 0.075827, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"} {"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062495, "task_name": "this_will_skip", "task_duration": 0.072133, "task_status": "skipped", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"} {"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "failure", "unravel_timestamp": 1668062496, "task_name": "failure_case", "task_duration": 0.066066, "task_status": "failed", "task_operator": "PythonOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"} {"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "failure", "unravel_timestamp": 1668062497, "task_name": "failure_case", "task_duration": 0.655575, "task_status": "failed", "task_operator": "PythonOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"} {"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062498, "task_name": "run_after_loop", "task_duration": 0.073114, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"} {"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062500, "task_name": "run_this_last", "task_duration": 0.078573, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"} ``` Here you can see for the failure case, it is logged twice. Also both of these failure case logging have different duration(I have verified if i am double logging in code by mistake, but that is not the case) ### Operating System macOS Monterey version 12.2.1 ### Versions of Apache Airflow Providers apache-airflow-providers-apache-hive==4.0.1 apache-airflow-providers-cncf-kubernetes==4.4.0 apache-airflow-providers-common-sql==1.2.0 apache-airflow-providers-ftp==3.1.0 apache-airflow-providers-http==4.0.0 apache-airflow-providers-imap==3.0.0 apache-airflow-providers-sqlite==3.2.1 ### Deployment Virtualenv installation ### Deployment details I am currently running the airflow in development mode and testing callback and cluster policy ### Anything else Once it is getting executed as part of the `taskinstance` callback, while another time it is getting executed as part of `DagFileProcessorProcess` I pulled out the logs from the specific task that is getting logged twice and this is part of `taskinstance.py` flow I found the below log in the task of the specific dag_run ```[2022-11-09, 09:59:40 UTC] {taskinstance.py:1851} ERROR - Task failed with exception Traceback (most recent call last): File "/Users/unravel/unravel_airflow/unravel_airflow/lib/python3.10/site-packages/airflow/operators/python.py", line 175, in execute return_value = self.execute_callable() File "/Users/unravel/unravel_airflow/unravel_airflow/lib/python3.10/site-packages/airflow/operators/python.py", line 193, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/Users/unravel/unravel_airflow/airflow/dags/test.py", line 67, in fail_case raise AirflowFailException('Failure case test for cluster policy') airflow.exceptions.AirflowFailException: Failure case test for cluster policy [2022-11-09, 09:59:40 UTC] {taskinstance.py:851} DEBUG - Refreshing TaskInstance <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [running]> from DB [2022-11-09, 09:59:40 UTC] {taskinstance.py:2325} DEBUG - Task Duration set to 0.061723 [2022-11-09, 09:59:40 UTC] {taskinstance.py:1412} DEBUG - Clearing next_method and next_kwargs. [2022-11-09, 09:59:40 UTC] {taskinstance.py:1401} INFO - Immediate failure requested. Marking task as FAILED. dag_id=test_bowrna, task_id=failure_case, execution_date=20221109T095932, start_date=20221109T095940, end_date=20221109T095940 [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Cluster dag policy:Task has failed [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Cluster policy:context for failure case: <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [failed]> [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Task id: failure_case [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Task id: <function task_failure_alert at 0x10749a050> [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Path to write: /Users/unravel/unravel_airflow/airflow/event_logger/test_bowrna/09-11-2022/manual__2022-11-09T09:59:32.540415+00:00.json [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Info details to log: {'dag_name': 'test_bowrna', 'dag_run_name': 'manual__2022-11-09T09:59:32.540415+00:00', 'status_callback': 'failure', 'unravel_timestamp': 1667987980, 'task_name': 'failure_case', 'task_duration': 0.061723, 'task_status': <TaskInstanceState.FAILED: 'failed'>, 'task_operator': 'PythonOperator', 'dag_start_date': datetime.datetime(2022, 11, 9, 9, 59, 34, 24988, tzinfo=Timezone('UTC')), 'dag_end_date': None, 'dag_state': 'running'} [2022-11-09, 09:59:40 UTC] {events.py:45} DEBUG - session flush listener: added [<TaskInstanceState.FAILED: 'failed'>] unchanged () deleted ['running'] - <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [failed]> [2022-11-09, 09:59:40 UTC] {cli_action_loggers.py:83} DEBUG - Calling callbacks: [] [2022-11-09, 09:59:40 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 1318 for task failure_case ('TaskInstance' object has no attribute 'task'; 49534) [2022-11-09, 09:59:40 UTC] {local_task_job.py:164} INFO - Task exited with return code 1 [2022-11-09, 09:59:40 UTC] {dagrun.py:674} DEBUG - number of tis tasks for <DagRun test_bowrna @ 2022-11-09 09:59:32.540415+00:00: manual__2022-11-09T09:59:32.540415+00:00, state:running, queued_at: 2022-11-09 09:59:32.551538+00:00. externally triggered: True>: 0 task(s) [2022-11-09, 09:59:40 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` Also I found another log in `airflow scheduler` ``` [2022-11-09T15:29:41.148+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.148+0530] {[dagbag.py:502](http://dagbag.py:502/)} DEBUG - Loaded DAG <DAG: test_bowrna> [2022-11-09T15:29:41.149+0530] {[processor.py:766](http://processor.py:766/)} INFO - DAG(s) dict_keys(['test_bowrna']) retrieved from /Users/unravel/unravel_airflow/airflow/dags/test.py [2022-11-09T15:29:41.149+0530] {[processor.py:639](http://processor.py:639/)} DEBUG - Processing Callback Request: {'full_filepath': '/Users/unravel/unravel_airflow/airflow/dags/test.py', 'processor_subdir': '/Users/unravel/unravel_airflow/airflow/dags', 'msg': "{'DAG Id': 'test_bowrna', 'Task Id': 'failure_case', 'Run Id': 'manual__2022-11-09T09:59:32.540415+00:00', 'Hostname': 'unravels-macbook-pro.local'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x1049f7a60>, 'is_failure_callback': True} [2022-11-09T15:29:41.198+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.198+0530] {[taskinstance.py:1853](http://taskinstance.py:1853/)} ERROR - {'DAG Id': 'test_bowrna', 'Task Id': 'failure_case', 'Run Id': 'manual__2022-11-09T09:59:32.540415+00:00', 'Hostname': 'unravels-macbook-pro.local'} [2022-11-09T15:29:41.199+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.199+0530] {[taskinstance.py:851](http://taskinstance.py:851/)} DEBUG - Refreshing TaskInstance <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [running]> from DB [2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.201+0530] {[taskinstance.py:2325](http://taskinstance.py:2325/)} DEBUG - Task Duration set to 0.520437 [2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.201+0530] {[taskinstance.py:1412](http://taskinstance.py:1412/)} DEBUG - Clearing next_method and next_kwargs. [2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.201+0530] {plugins_manager.py:300} DEBUG - Loading plugins [2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.201+0530] {plugins_manager.py:244} DEBUG - Loading plugins from directory: /Users/unravel/unravel_airflow/airflow/plugins [2022-11-09T15:29:41.202+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.202+0530] {plugins_manager.py:259} DEBUG - Importing plugin module /Users/unravel/unravel_airflow/airflow/plugins/plugin.py [2022-11-09T15:29:41.202+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.202+0530] {plugins_manager.py:259} DEBUG - Importing plugin module /Users/unravel/unravel_airflow/airflow/plugins/listener.py [2022-11-09T15:29:41.204+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.203+0530] {plugins_manager.py:224} DEBUG - Loading plugins from entrypoints [2022-11-09T15:29:41.228+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.228+0530] {plugins_manager.py:316} DEBUG - Loading 1 plugin(s) took 0.03 seconds [2022-11-09T15:29:41.228+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.228+0530] {plugins_manager.py:476} DEBUG - Integrate DAG plugins [2022-11-09T15:29:41.230+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.230+0530] {[taskinstance.py:1099](http://taskinstance.py:1099/)} DEBUG - previous_execution_date was called [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.231+0530] {[taskinstance.py:1401](http://taskinstance.py:1401/)} INFO - Marking task as FAILED. dag_id=test_bowrna, task_id=failure_case, execution_date=20221109T095932, start_date=20221109T095940, end_date=20221109T095941 [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Cluster dag policy:Task has failed [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Cluster policy:context for failure case: <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [failed]> [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Task id: failure_case [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Task id: <function task_failure_alert at 0x1048c9240> [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Path to write: /Users/unravel/unravel_airflow/airflow/event_logger/test_bowrna/09-11-2022/manual__2022-11-09T09:59:32.540415+00:00.json [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Info details to log: {'dag_name': 'test_bowrna', 'dag_run_name': 'manual__2022-11-09T09:59:32.540415+00:00', 'status_callback': 'failure', 'unravel_timestamp': 1667987981, 'task_name': 'failure_case', 'task_duration': 0.520437, 'task_status': <TaskInstanceState.FAILED: 'failed'>, 'task_operator': 'PythonOperator', 'dag_start_date': datetime.datetime(2022, 11, 9, 9, 59, 34, 24988, tzinfo=Timezone('UTC')), 'dag_end_date': None, 'dag_state': 'running'} [2022-11-09T15:29:41.234+0530] {[processor.py:725](http://processor.py:725/)} INFO - Executed failure callback for <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [failed]> in state failed [2022-11-09T15:29:41.236+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.236+0530] {[dagbag.py:647](http://dagbag.py:647/)} DEBUG - Running dagbag.sync_to_db with retries. Try 1 of ``` This also logs the failure case that gets executed from processor.py flow of the code ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
