mobuchowski commented on code in PR #44547:
URL: https://github.com/apache/airflow/pull/44547#discussion_r1888434231


##########
airflow/models/taskinstance.py:
##########
@@ -1127,7 +1127,8 @@ def _handle_failure(
         )
 
     if not test_mode:
-        TaskInstance.save_to_db(failure_context["ti"], session)
+        task_instance.dag_run.refresh_from_db()

Review Comment:
   > I am not sure about this one. Here you refresh the DagRun and not the TI 
from DB. In the error description/PR Header you write the task instance might 
be updated twice.
   
   Ah yes, I did not write this clearly enough - I thought about reading the 
description together with title - it's the DagRun that's update twice. 
   
   > But usually when you mark a TI as "failed" manually it is terminated. Is 
this error situation ALWAYS happening or only in a race when you mark it 
manually and the TI is close to completion?
   
   This is also happening when task is terminated, it still goes into 
`handle_failure` method:
   
   ```
           except (AirflowTaskTimeout, AirflowException, AirflowTaskTerminated) 
as e:
               if not test_mode:
                   ti.refresh_from_db(lock_for_update=True, session=session)
               # for case when task is marked as success/failed externally
               # or dagrun timed out and task is marked as skipped
               # current behavior doesn't hit the callbacks
               if ti.state in State.finished:
                   ti.clear_next_method_args()
                   TaskInstance.save_to_db(ti=ti, session=session)
                   return None
               else:
                   ti.handle_failure(e, test_mode, context, session=session)
                   raise
   ```
   
   >And are you talking about on_error callback on the DAG or on the TI? 
Assuming if you mark failed it is on TI level and not the DAG itself?
   
   I've been testing it with listeners, but it affects callbacks as well.
   
   
[clear.py.log](https://github.com/user-attachments/files/18165371/clear.py.log)
   
   - you can see the line starting with
   
   ```
   DAG clear_test failed at 2024-12-17 12:23:56.974528+00:00. Run ID: 
manual__2024-12-17T12:23:56.974528+00:00
   ```
   
   happens twice. This is the DAG I'm testing it with.
   
   ```
   from airflow import DAG
   from airflow.operators.empty import EmptyOperator
   from airflow.providers.standard.operators.python import PythonOperator
   from datetime import datetime, timedelta
   import time
   
   
   def wait_function():
       time.sleep(10)
   
   
   def notify_failure(context):
       dag_id = context['dag'].dag_id
       run_id = context['run_id']
       logical_date = context['logical_date']
       log_url = context['task_instance'].log_url
       print(f"DAG {dag_id} failed at {logical_date}. Run ID: {run_id}. See 
logs: {log_url}")
   
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'start_date': datetime(2023, 10, 1),
       'email_on_failure': False,
       'email_on_retry': False
   }
   
   with DAG(
       'clear_test',
       default_args=default_args,
       description='A simple example DAG',
       on_failure_callback=notify_failure,
       catchup=False,
   ) as dag:
       start_task = EmptyOperator(
           task_id='start_task',
       )
   
       wait_task = PythonOperator(
           task_id='wait_task',
           python_callable=wait_function,
       )
   
       start_task >> wait_task
   
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to