This is an automated email from the ASF dual-hosted git repository.
utkarsharma pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 90d6332bd15 Double-check TaskInstance state if it differs from the
Executor state. (#43063)
90d6332bd15 is described below
commit 90d6332bd15521c0ecb7f1e760056ac317e2b940
Author: Antony Southworth
<[email protected]>
AuthorDate: Tue Dec 10 03:49:31 2024 +1300
Double-check TaskInstance state if it differs from the Executor state.
(#43063)
* Double-check TaskInstance state if it differs from Executor.
* Update airflow/jobs/backfill_job_runner.py
* Update airflow/jobs/backfill_job_runner.py
* Update airflow/jobs/backfill_job_runner.py
* Update airflow/jobs/backfill_job_runner.py
* Update airflow/jobs/backfill_job_runner.py
* Update airflow/jobs/backfill_job_runner.py
---------
Co-authored-by: Utkarsh Sharma <[email protected]>
---
airflow/jobs/backfill_job_runner.py | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/airflow/jobs/backfill_job_runner.py
b/airflow/jobs/backfill_job_runner.py
index 961c4b7e020..305eaff84be 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -309,6 +309,17 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
self.log.debug("Executor state: %s task %s", state, ti)
+ if (
+ state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS)
+ and ti.state in self.STATES_COUNT_AS_RUNNING
+ ):
+ self.log.debug(
+ "In-memory TaskInstance state %s does not agree with
executor state %s. Attempting to resolve by refreshing in-memory task instance
from DB.",
+ ti,
+ state,
+ )
+ ti.refresh_from_db(session=session)
+
if (
state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS)
and ti.state in self.STATES_COUNT_AS_RUNNING