This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 90d6332bd15 Double-check TaskInstance state if it differs from the 
Executor state. (#43063)
90d6332bd15 is described below

commit 90d6332bd15521c0ecb7f1e760056ac317e2b940
Author: Antony Southworth 
<[email protected]>
AuthorDate: Tue Dec 10 03:49:31 2024 +1300

    Double-check TaskInstance state if it differs from the Executor state. 
(#43063)
    
    * Double-check TaskInstance state if it differs from Executor.
    
    * Update airflow/jobs/backfill_job_runner.py
    
    * Update airflow/jobs/backfill_job_runner.py
    
    * Update airflow/jobs/backfill_job_runner.py
    
    * Update airflow/jobs/backfill_job_runner.py
    
    * Update airflow/jobs/backfill_job_runner.py
    
    * Update airflow/jobs/backfill_job_runner.py
    
    ---------
    
    Co-authored-by: Utkarsh Sharma <[email protected]>
---
 airflow/jobs/backfill_job_runner.py | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/airflow/jobs/backfill_job_runner.py 
b/airflow/jobs/backfill_job_runner.py
index 961c4b7e020..305eaff84be 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -309,6 +309,17 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
 
             self.log.debug("Executor state: %s task %s", state, ti)
 
+            if (
+                state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS)
+                and ti.state in self.STATES_COUNT_AS_RUNNING
+            ):
+                self.log.debug(
+                    "In-memory TaskInstance state %s does not agree with 
executor state %s. Attempting to resolve by refreshing in-memory task instance 
from DB.",
+                    ti,
+                    state,
+                )
+                ti.refresh_from_db(session=session)
+
             if (
                 state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS)
                 and ti.state in self.STATES_COUNT_AS_RUNNING

Reply via email to