This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bca0a63ec38 Fix missing TaskInstanceHistory on scheduler TI resets
(#59639)
bca0a63ec38 is described below
commit bca0a63ec387c7ddd3265daa8089cce752b3dcfc
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Dec 23 15:00:08 2025 +0100
Fix missing TaskInstanceHistory on scheduler TI resets (#59639)
When a SchedulerJob is marked failed, orphaned task instances may be reset
and re-scheduled, incrementing try_number without recording the abandoned
attempt.
This change records the current attempt into task_instance_history
before resetting so users have a complete audit trail of the failure.
related #57618
---
.../src/airflow/jobs/scheduler_job_runner.py | 8 ++++++++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 20 ++++++++++++++++++++
2 files changed, 28 insertions(+)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 36ea2fa7419..50c90e15551 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2604,8 +2604,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
reset_tis_message = []
for ti in to_reset:
reset_tis_message.append(repr(ti))
+ # If we reset a TI, it will be eligible to be
scheduled again.
+ # This can cause the scheduler to increase the
try_number on the TI.
+ # Record the current try to TaskInstanceHistory first
so users have an audit trail for
+ # the attempt that was abandoned.
+ ti.prepare_db_for_next_try(session=session)
+
ti.state = None
ti.queued_by_job_id = None
+ ti.external_executor_id = None
+ ti.clear_next_method_args()
for ti in set(tis_to_adopt_or_reset) - set(to_reset):
ti.queued_by_job_id = self.job.id
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 02f3c546fe2..1c2203579f9 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -4138,6 +4138,8 @@ class TestSchedulerJob:
list(sorted(State.adoptable_states)),
)
def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state,
session):
+ from airflow.models.taskinstancehistory import TaskInstanceHistory
+
dag_id = "test_adopt_or_reset_adoptable_tasks_" + adoptable_state.name
with dag_maker(dag_id=dag_id, schedule="@daily"):
task_id = dag_id + "_task"
@@ -4152,6 +4154,8 @@ class TestSchedulerJob:
ti = dr1.get_task_instances(session=session)[0]
ti.state = adoptable_state
ti.queued_by_job_id = old_job.id
+ old_ti_id = ti.id
+ old_try_number = ti.try_number
session.merge(ti)
session.merge(dr1)
session.commit()
@@ -4159,6 +4163,22 @@ class TestSchedulerJob:
num_reset_tis =
self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
assert num_reset_tis == 1
+ ti.refresh_from_db(session=session)
+ assert ti.id != old_ti_id
+ assert (
+ session.scalar(
+ select(TaskInstanceHistory).where(
+ TaskInstanceHistory.dag_id == ti.dag_id,
+ TaskInstanceHistory.task_id == ti.task_id,
+ TaskInstanceHistory.run_id == ti.run_id,
+ TaskInstanceHistory.map_index == ti.map_index,
+ TaskInstanceHistory.try_number == old_try_number,
+ TaskInstanceHistory.task_instance_id == old_ti_id,
+ )
+ )
+ is not None
+ )
+
def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self,
dag_maker, session):
dag_id = "test_reset_orphaned_tasks_external_triggered_dag"
with dag_maker(dag_id=dag_id, schedule="@daily"):