This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bca0a63ec38 Fix missing TaskInstanceHistory on scheduler TI resets 
(#59639)
bca0a63ec38 is described below

commit bca0a63ec387c7ddd3265daa8089cce752b3dcfc
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Dec 23 15:00:08 2025 +0100

    Fix missing TaskInstanceHistory on scheduler TI resets (#59639)
    
    When a SchedulerJob is marked failed, orphaned task instances may be reset
    and re-scheduled, incrementing try_number without recording the abandoned
    attempt.
    
    This change records the current attempt into task_instance_history
    before resetting so users have a complete audit trail of the failure.
    
    related #57618
---
 .../src/airflow/jobs/scheduler_job_runner.py         |  8 ++++++++
 airflow-core/tests/unit/jobs/test_scheduler_job.py   | 20 ++++++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 36ea2fa7419..50c90e15551 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2604,8 +2604,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     reset_tis_message = []
                     for ti in to_reset:
                         reset_tis_message.append(repr(ti))
+                        # If we reset a TI, it will be eligible to be 
scheduled again.
+                        # This can cause the scheduler to increase the 
try_number on the TI.
+                        # Record the current try to TaskInstanceHistory first 
so users have an audit trail for
+                        # the attempt that was abandoned.
+                        ti.prepare_db_for_next_try(session=session)
+
                         ti.state = None
                         ti.queued_by_job_id = None
+                        ti.external_executor_id = None
+                        ti.clear_next_method_args()
 
                     for ti in set(tis_to_adopt_or_reset) - set(to_reset):
                         ti.queued_by_job_id = self.job.id
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 02f3c546fe2..1c2203579f9 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -4138,6 +4138,8 @@ class TestSchedulerJob:
         list(sorted(State.adoptable_states)),
     )
     def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, 
session):
+        from airflow.models.taskinstancehistory import TaskInstanceHistory
+
         dag_id = "test_adopt_or_reset_adoptable_tasks_" + adoptable_state.name
         with dag_maker(dag_id=dag_id, schedule="@daily"):
             task_id = dag_id + "_task"
@@ -4152,6 +4154,8 @@ class TestSchedulerJob:
         ti = dr1.get_task_instances(session=session)[0]
         ti.state = adoptable_state
         ti.queued_by_job_id = old_job.id
+        old_ti_id = ti.id
+        old_try_number = ti.try_number
         session.merge(ti)
         session.merge(dr1)
         session.commit()
@@ -4159,6 +4163,22 @@ class TestSchedulerJob:
         num_reset_tis = 
self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
         assert num_reset_tis == 1
 
+        ti.refresh_from_db(session=session)
+        assert ti.id != old_ti_id
+        assert (
+            session.scalar(
+                select(TaskInstanceHistory).where(
+                    TaskInstanceHistory.dag_id == ti.dag_id,
+                    TaskInstanceHistory.task_id == ti.task_id,
+                    TaskInstanceHistory.run_id == ti.run_id,
+                    TaskInstanceHistory.map_index == ti.map_index,
+                    TaskInstanceHistory.try_number == old_try_number,
+                    TaskInstanceHistory.task_instance_id == old_ti_id,
+                )
+            )
+            is not None
+        )
+
     def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self, 
dag_maker, session):
         dag_id = "test_reset_orphaned_tasks_external_triggered_dag"
         with dag_maker(dag_id=dag_id, schedule="@daily"):

Reply via email to