Bowrna opened a new issue, #27614:
URL: https://github.com/apache/airflow/issues/27614

   ### Apache Airflow version
   
   2.4.2
   
   ### What happened
   
   I have applied task callback for failure and success case using Cluster 
policy for the specific dag 
[https://gist.github.com/Bowrna/7fdca8b546fc274edd068ffdae5b76f9](https://gist.github.com/Bowrna/7fdca8b546fc274edd068ffdae5b76f9)
   
   I am attaching the cluster policy that I have applied here.
   https://gist.github.com/Bowrna/1994894beea39fa8e1c269591b7f0346
   
   On executing the DAG, the success callback is correctly invoked once for 
every successful task, while failure callback is invoked twice for a failure in 
task
   
   ### What you think should happen instead
   
   Like success callback, failure callback also should get executed only once. 
   
   ### How to reproduce
   
   I have attached the sample DAG and airflow_local_settings.py file in which i 
have added the cluster policy used. On running airflow with that and executing 
the DAG either manually/scheduled will cause to log the below details
   
   ```
   {"dag_name": "test_bowrna", "dag_run_name": 
"scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", 
"unravel_timestamp": 1668062485, "task_name": "runme_0", "task_duration": 
1.178738, "task_status": "success", "task_operator": "BashOperator", 
"dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, 
"dag_state": "running"}
   {"dag_name": "test_bowrna", "dag_run_name": 
"scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", 
"unravel_timestamp": 1668062486, "task_name": "runme_1", "task_duration": 
1.081635, "task_status": "success", "task_operator": "BashOperator", 
"dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, 
"dag_state": "running"}
   {"dag_name": "test_bowrna", "dag_run_name": 
"scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", 
"unravel_timestamp": 1668062488, "task_name": "runme_2", "task_duration": 
1.071424, "task_status": "success", "task_operator": "BashOperator", 
"dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, 
"dag_state": "running"}
   {"dag_name": "test_bowrna", "dag_run_name": 
"scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", 
"unravel_timestamp": 1668062495, "task_name": "also_run_this", "task_duration": 
0.075827, "task_status": "success", "task_operator": "BashOperator", 
"dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, 
"dag_state": "running"}
   {"dag_name": "test_bowrna", "dag_run_name": 
"scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", 
"unravel_timestamp": 1668062495, "task_name": "this_will_skip", 
"task_duration": 0.072133, "task_status": "skipped", "task_operator": 
"BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", 
"dag_end_date": null, "dag_state": "running"}
   {"dag_name": "test_bowrna", "dag_run_name": 
"scheduled__2022-11-09T00:00:00+00:00", "status_callback": "failure", 
"unravel_timestamp": 1668062496, "task_name": "failure_case", "task_duration": 
0.066066, "task_status": "failed", "task_operator": "PythonOperator", 
"dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, 
"dag_state": "running"}
   {"dag_name": "test_bowrna", "dag_run_name": 
"scheduled__2022-11-09T00:00:00+00:00", "status_callback": "failure", 
"unravel_timestamp": 1668062497, "task_name": "failure_case", "task_duration": 
0.655575, "task_status": "failed", "task_operator": "PythonOperator", 
"dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, 
"dag_state": "running"}
   {"dag_name": "test_bowrna", "dag_run_name": 
"scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", 
"unravel_timestamp": 1668062498, "task_name": "run_after_loop", 
"task_duration": 0.073114, "task_status": "success", "task_operator": 
"BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", 
"dag_end_date": null, "dag_state": "running"}
   {"dag_name": "test_bowrna", "dag_run_name": 
"scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", 
"unravel_timestamp": 1668062500, "task_name": "run_this_last", "task_duration": 
0.078573, "task_status": "success", "task_operator": "BashOperator", 
"dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, 
"dag_state": "running"}
   ```
   Here you can see for the failure case, it is logged twice. Also both of 
these failure case logging have different duration(I have verified if i am 
double logging in code by mistake, but that is not the case)
   
   ### Operating System
   
   macOS Monterey version 12.2.1
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-hive==4.0.1
   apache-airflow-providers-cncf-kubernetes==4.4.0
   apache-airflow-providers-common-sql==1.2.0
   apache-airflow-providers-ftp==3.1.0
   apache-airflow-providers-http==4.0.0
   apache-airflow-providers-imap==3.0.0
   apache-airflow-providers-sqlite==3.2.1
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   I am currently running the airflow in development mode and testing callback 
and cluster policy
   
   ### Anything else
   
   Once it is getting executed as part of the `taskinstance` callback, while 
another time it is getting executed as part of `DagFileProcessorProcess`
   I pulled out the logs from the specific task that is getting logged twice 
and this is part of `taskinstance.py` flow
   
   I found the below log in the task of the specific dag_run
   ```[2022-11-09, 09:59:40 UTC] {taskinstance.py:1851} ERROR - Task failed 
with exception
   Traceback (most recent call last):
     File 
"/Users/unravel/unravel_airflow/unravel_airflow/lib/python3.10/site-packages/airflow/operators/python.py",
 line 175, in execute
       return_value = self.execute_callable()
     File 
"/Users/unravel/unravel_airflow/unravel_airflow/lib/python3.10/site-packages/airflow/operators/python.py",
 line 193, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
     File "/Users/unravel/unravel_airflow/airflow/dags/test.py", line 67, in 
fail_case
       raise AirflowFailException('Failure case test for cluster policy')
   airflow.exceptions.AirflowFailException: Failure case test for cluster policy
   [2022-11-09, 09:59:40 UTC] {taskinstance.py:851} DEBUG - Refreshing 
TaskInstance <TaskInstance: test_bowrna.failure_case 
manual__2022-11-09T09:59:32.540415+00:00 [running]> from DB
   [2022-11-09, 09:59:40 UTC] {taskinstance.py:2325} DEBUG - Task Duration set 
to 0.061723
   [2022-11-09, 09:59:40 UTC] {taskinstance.py:1412} DEBUG - Clearing 
next_method and next_kwargs.
   [2022-11-09, 09:59:40 UTC] {taskinstance.py:1401} INFO - Immediate failure 
requested. Marking task as FAILED. dag_id=test_bowrna, task_id=failure_case, 
execution_date=20221109T095932, start_date=20221109T095940, 
end_date=20221109T095940
   [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Cluster dag 
policy:Task has failed
   [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Cluster 
policy:context for failure case: <TaskInstance: test_bowrna.failure_case 
manual__2022-11-09T09:59:32.540415+00:00 [failed]>
   [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Task id: 
failure_case
   [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Task id: <function 
task_failure_alert at 0x10749a050>
   [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Path to write: 
/Users/unravel/unravel_airflow/airflow/event_logger/test_bowrna/09-11-2022/manual__2022-11-09T09:59:32.540415+00:00.json
   [2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Info details to 
log: {'dag_name': 'test_bowrna', 'dag_run_name': 
'manual__2022-11-09T09:59:32.540415+00:00', 'status_callback': 'failure', 
'unravel_timestamp': 1667987980, 'task_name': 'failure_case', 'task_duration': 
0.061723, 'task_status': <TaskInstanceState.FAILED: 'failed'>, 'task_operator': 
'PythonOperator', 'dag_start_date': datetime.datetime(2022, 11, 9, 9, 59, 34, 
24988, tzinfo=Timezone('UTC')), 'dag_end_date': None, 'dag_state': 'running'}
   [2022-11-09, 09:59:40 UTC] {events.py:45} DEBUG - session flush listener: 
added [<TaskInstanceState.FAILED: 'failed'>] unchanged () deleted ['running'] - 
<TaskInstance: test_bowrna.failure_case 
manual__2022-11-09T09:59:32.540415+00:00 [failed]>
   [2022-11-09, 09:59:40 UTC] {cli_action_loggers.py:83} DEBUG - Calling 
callbacks: []
   [2022-11-09, 09:59:40 UTC] {standard_task_runner.py:100} ERROR - Failed to 
execute job 1318 for task failure_case ('TaskInstance' object has no attribute 
'task'; 49534)
   [2022-11-09, 09:59:40 UTC] {local_task_job.py:164} INFO - Task exited with 
return code 1
   [2022-11-09, 09:59:40 UTC] {dagrun.py:674} DEBUG - number of tis tasks for 
<DagRun test_bowrna @ 2022-11-09 09:59:32.540415+00:00: 
manual__2022-11-09T09:59:32.540415+00:00, state:running, queued_at: 2022-11-09 
09:59:32.551538+00:00. externally triggered: True>: 0 task(s)
   [2022-11-09, 09:59:40 UTC] {local_task_job.py:273} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   Also I found another log in `airflow scheduler`
   
   ```
   [2022-11-09T15:29:41.148+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.148+0530] {[dagbag.py:502](http://dagbag.py:502/)} DEBUG - 
Loaded DAG <DAG: test_bowrna>
   [2022-11-09T15:29:41.149+0530] 
{[processor.py:766](http://processor.py:766/)} INFO - DAG(s) 
dict_keys(['test_bowrna']) retrieved from 
/Users/unravel/unravel_airflow/airflow/dags/test.py
   [2022-11-09T15:29:41.149+0530] 
{[processor.py:639](http://processor.py:639/)} DEBUG - Processing Callback 
Request: {'full_filepath': 
'/Users/unravel/unravel_airflow/airflow/dags/test.py', 'processor_subdir': 
'/Users/unravel/unravel_airflow/airflow/dags', 'msg': "{'DAG Id': 
'test_bowrna', 'Task Id': 'failure_case', 'Run Id': 
'manual__2022-11-09T09:59:32.540415+00:00', 'Hostname': 
'unravels-macbook-pro.local'}", 'simple_task_instance': 
<airflow.models.taskinstance.SimpleTaskInstance object at 0x1049f7a60>, 
'is_failure_callback': True}
   [2022-11-09T15:29:41.198+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.198+0530] 
{[taskinstance.py:1853](http://taskinstance.py:1853/)} ERROR - {'DAG Id': 
'test_bowrna', 'Task Id': 'failure_case', 'Run Id': 
'manual__2022-11-09T09:59:32.540415+00:00', 'Hostname': 
'unravels-macbook-pro.local'}
   [2022-11-09T15:29:41.199+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.199+0530] 
{[taskinstance.py:851](http://taskinstance.py:851/)} DEBUG - Refreshing 
TaskInstance <TaskInstance: test_bowrna.failure_case 
manual__2022-11-09T09:59:32.540415+00:00 [running]> from DB
   [2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.201+0530] 
{[taskinstance.py:2325](http://taskinstance.py:2325/)} DEBUG - Task Duration 
set to 0.520437
   [2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.201+0530] 
{[taskinstance.py:1412](http://taskinstance.py:1412/)} DEBUG - Clearing 
next_method and next_kwargs.
   [2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.201+0530] {plugins_manager.py:300} DEBUG - Loading plugins
   [2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.201+0530] {plugins_manager.py:244} DEBUG - Loading plugins 
from directory: /Users/unravel/unravel_airflow/airflow/plugins
   [2022-11-09T15:29:41.202+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.202+0530] {plugins_manager.py:259} DEBUG - Importing 
plugin module /Users/unravel/unravel_airflow/airflow/plugins/plugin.py
   [2022-11-09T15:29:41.202+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.202+0530] {plugins_manager.py:259} DEBUG - Importing 
plugin module /Users/unravel/unravel_airflow/airflow/plugins/listener.py
   [2022-11-09T15:29:41.204+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.203+0530] {plugins_manager.py:224} DEBUG - Loading plugins 
from entrypoints
   [2022-11-09T15:29:41.228+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.228+0530] {plugins_manager.py:316} DEBUG - Loading 1 
plugin(s) took 0.03 seconds
   [2022-11-09T15:29:41.228+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.228+0530] {plugins_manager.py:476} DEBUG - Integrate DAG 
plugins
   [2022-11-09T15:29:41.230+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.230+0530] 
{[taskinstance.py:1099](http://taskinstance.py:1099/)} DEBUG - 
previous_execution_date was called
   [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.231+0530] 
{[taskinstance.py:1401](http://taskinstance.py:1401/)} INFO - Marking task as 
FAILED. dag_id=test_bowrna, task_id=failure_case, 
execution_date=20221109T095932, start_date=20221109T095940, 
end_date=20221109T095941
   [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Cluster dag 
policy:Task has failed
   [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Cluster 
policy:context for failure case: <TaskInstance: test_bowrna.failure_case 
manual__2022-11-09T09:59:32.540415+00:00 [failed]>
   [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Task id: 
failure_case
   [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Task id: 
<function task_failure_alert at 0x1048c9240>
   [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Path to write: 
/Users/unravel/unravel_airflow/airflow/event_logger/test_bowrna/09-11-2022/manual__2022-11-09T09:59:32.540415+00:00.json
   [2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Info details to 
log: {'dag_name': 'test_bowrna', 'dag_run_name': 
'manual__2022-11-09T09:59:32.540415+00:00', 'status_callback': 'failure', 
'unravel_timestamp': 1667987981, 'task_name': 'failure_case', 'task_duration': 
0.520437, 'task_status': <TaskInstanceState.FAILED: 'failed'>, 'task_operator': 
'PythonOperator', 'dag_start_date': datetime.datetime(2022, 11, 9, 9, 59, 34, 
24988, tzinfo=Timezone('UTC')), 'dag_end_date': None, 'dag_state': 'running'}
   [2022-11-09T15:29:41.234+0530] 
{[processor.py:725](http://processor.py:725/)} INFO - Executed failure callback 
for <TaskInstance: test_bowrna.failure_case 
manual__2022-11-09T09:59:32.540415+00:00 [failed]> in state failed
   [2022-11-09T15:29:41.236+0530] {logging_mixin.py:120} INFO - 
[2022-11-09T15:29:41.236+0530] {[dagbag.py:647](http://dagbag.py:647/)} DEBUG - 
Running dagbag.sync_to_db with retries. Try 1 of
   ```
   This also logs the failure case that gets executed from processor.py flow of 
the code
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to