This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 1cf26e55388 [v3-1-test] Add logging to detect try number race (#62703)
(#62821)
1cf26e55388 is described below
commit 1cf26e55388876cdbf92b77ae276a73fb686e172
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Mar 4 10:51:23 2026 +0100
[v3-1-test] Add logging to detect try number race (#62703) (#62821)
* Add logging to detect try number race (#62703)
* Log try_number mismatches during TI scheduling for HA race diagnosis
This adds more logging to select places that try_number mismatch
could happen and would help us detect and fix the issue.
Related: https://github.com/apache/airflow/issues/57618
* Add tests
(cherry picked from commit 95784d9f2d4edb5bf3a64ee44c1fa8264a8618df)
* fixup! Add logging to detect try number race (#62703)
* fixup! fixup! Add logging to detect try number race (#62703)
---
.../src/airflow/jobs/scheduler_job_runner.py | 58 ++++++++++++++--
airflow-core/src/airflow/models/dagrun.py | 43 ++++++++++++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 44 +++++++++++-
airflow-core/tests/unit/models/test_dagrun.py | 78 ++++++++++++++++++++++
4 files changed, 216 insertions(+), 7 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 53baf765bb2..c66be676137 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import itertools
+import logging
import multiprocessing
import operator
import os
@@ -72,7 +73,7 @@ from airflow.models.dagbag import DBDagBag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger,
TriggerFailureReason
from airflow.stats import Stats
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
@@ -99,7 +100,6 @@ if TYPE_CHECKING:
from airflow._shared.logging.types import Logger
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_utils import ExecutorName
- from airflow.models.taskinstance import TaskInstanceKey
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils.sqlalchemy import CommitProhibitorGuard
@@ -655,8 +655,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
Stats.gauge("scheduler.tasks.executable", len(executable_tis))
if executable_tis:
- task_instance_str = "\n".join(f"\t{x!r}" for x in executable_tis)
- self.log.info("Setting the following tasks to queued state:\n%s",
task_instance_str)
+ task_instance_str = "\n".join(
+ f"\t{x!r} (id={x.id}, try_number={x.try_number})" for x in
executable_tis
+ )
+ self.log.info(
+ "Setting the following tasks to queued state (scheduler
job_id=%s):\n%s",
+ self.job.id,
+ task_instance_str,
+ )
# set TIs to queued state
filter_for_tis = TI.filter_for_tis(executable_tis)
@@ -702,7 +708,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
ti,
)
continue
-
+ self.log.debug(
+ "Queueing workload for TI: %s id=%s try_number=%d state=%s
scheduler_job_id=%s executor=%s",
+ ti,
+ ti.id,
+ ti.try_number,
+ ti.state,
+ self.job.id,
+ executor,
+ )
workload = workloads.ExecuteTask.make(ti,
generator=executor.jwt_generator)
executor.queue_workload(workload, session=session)
@@ -823,6 +837,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# Report execution
for ti_key, (state, _) in event_buffer.items():
+ if isinstance(ti_key, TaskInstanceKey):
+ existing_try =
ti_primary_key_to_try_number_map.get(ti_key.primary)
+ if existing_try is not None and existing_try !=
ti_key.try_number:
+ cls.logger().warning(
+ "Multiple executor events for same TI with different
try_numbers! "
+ "primary_key=%s existing_try_number=%d
new_try_number=%d new_state=%s. ",
+ ti_key.primary,
+ existing_try,
+ ti_key.try_number,
+ state,
+ )
# We create map (dag_id, task_id, logical_date) -> in-memory
try_number
ti_primary_key_to_try_number_map[ti_key.primary] =
ti_key.try_number
@@ -858,6 +883,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
for ti in tis:
try_number = ti_primary_key_to_try_number_map[ti.key.primary]
buffer_key = ti.key.with_try_number(try_number)
+ if ti.try_number != try_number:
+ cls.logger().warning(
+ "TI try_number mismatch: db_try_number=%d
event_try_number=%d "
+ "ti=%s ti_id=%s state=%s job_id=%s. "
+ "Another scheduler may have already modified this TI.",
+ ti.try_number,
+ try_number,
+ ti,
+ ti.id,
+ ti.state,
+ job_id,
+ )
state, info = event_buffer.pop(buffer_key)
if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
@@ -2070,6 +2107,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
"schedulable_tis": [_ti.task_id for _ti in
schedulable_tis],
},
)
+ if schedulable_tis and self.log.isEnabledFor(logging.DEBUG):
+ self.log.debug(
+ "Scheduling TIs for dag_run=%s/%s (scheduler job_id=%s):
%s",
+ dag_run.dag_id,
+ dag_run.run_id,
+ self.job.id,
+ [
+ f"{ti.task_id} (id={ti.id}, state={ti.state},
try_number={ti.try_number})"
+ for ti in schedulable_tis
+ ],
+ )
dag_run.schedule_tis(schedulable_tis, session,
max_tis_per_query=self.job.max_tis_per_query)
return callback_to_run
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 39fad84be6a..0471696d59e 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import itertools
+import logging
import os
import re
from collections import defaultdict
@@ -2003,6 +2004,8 @@ class DagRun(Base, LoggingMixin):
# tasks using EmptyOperator and without on_execute_callback /
on_success_callback
empty_ti_ids: list[str] = []
schedulable_ti_ids: list[str] = []
+ debug_try_number_check = self.log.isEnabledFor(logging.DEBUG)
+ expected_try_number_by_ti_id: dict[str, tuple[int, int, str | None]] =
{}
for ti in schedulable_tis:
task = ti.task
if TYPE_CHECKING:
@@ -2034,6 +2037,14 @@ class DagRun(Base, LoggingMixin):
# schedulable_ti_ids.append(ti.id)
else:
schedulable_ti_ids.append(ti.id)
+ if debug_try_number_check:
+ expected_try_number_by_ti_id[ti.id] = (
+ ti.try_number
+ if ti.state == TaskInstanceState.UP_FOR_RESCHEDULE
+ else ti.try_number + 1,
+ ti.try_number,
+ ti.state,
+ )
count = 0
@@ -2058,6 +2069,38 @@ class DagRun(Base, LoggingMixin):
)
.execution_options(synchronize_session=False)
).rowcount
+ if debug_try_number_check:
+ rows = session.execute(
+ select(TI.id, TI.try_number,
TI.state).where(TI.id.in_(id_chunk))
+ ).all()
+ rows_by_ti_id = {
+ ti_id: (db_try_number, db_state) for ti_id,
db_try_number, db_state in rows
+ }
+ for ti_id in id_chunk:
+ expected = expected_try_number_by_ti_id.get(ti_id)
+ if expected is None:
+ continue
+ db_row = rows_by_ti_id.get(ti_id)
+ if db_row is None:
+ continue
+ expected_try_number, pre_update_try_number,
pre_update_state = expected
+ db_try_number, db_state = db_row
+ if db_try_number != expected_try_number:
+ self.log.warning(
+ "schedule_tis: try_number mismatch after
scheduling for ti_id=%s "
+ "dag_run=%s/%s scheduler_job_id=%s "
+ "pre_state=%s pre_try_number=%d
expected_try_number=%d "
+ "db_state=%s db_try_number=%d",
+ ti_id,
+ self.dag_id,
+ self.run_id,
+ self.scheduled_by_job_id,
+ pre_update_state,
+ pre_update_try_number,
+ expected_try_number,
+ db_state,
+ db_try_number,
+ )
# Tasks using EmptyOperator should not be executed, mark them as
success
if empty_ti_ids:
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 8fac72ea3ee..d9cc4e0a4e5 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -552,7 +552,9 @@ class TestSchedulerJob:
@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
- def test_process_executor_events_ti_requeued(self, mock_stats_incr,
mock_task_callback, dag_maker):
+ def test_process_executor_events_ti_requeued(
+ self, mock_stats_incr, mock_task_callback, dag_maker, caplog
+ ):
dag_id = "test_process_executor_events_ti_requeued"
task_id_1 = "dummy_task"
@@ -580,10 +582,12 @@ class TestSchedulerJob:
executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None
- self.job_runner._process_executor_events(executor=executor,
session=session)
+ with caplog.at_level(logging.WARNING,
logger="airflow.jobs.scheduler_job_runner"):
+ self.job_runner._process_executor_events(executor=executor,
session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
scheduler_job.executor.callback_sink.send.assert_not_called()
+ assert any("TI try_number mismatch:" in rec.message for rec in
caplog.records)
# ti is queued by another scheduler - do not fail it
ti1.state = State.QUEUED
@@ -613,6 +617,42 @@ class TestSchedulerJob:
scheduler_job.executor.callback_sink.send.assert_not_called()
mock_stats_incr.assert_not_called()
+ @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
+ @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
+ def test_process_executor_events_multiple_try_numbers_warns(
+ self, mock_stats_incr, mock_task_callback, dag_maker, caplog
+ ):
+ dag_id = "test_process_executor_events_multiple_try_numbers_warns"
+ task_id = "dummy_task"
+
+ session = settings.Session()
+ with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
+ task = EmptyOperator(task_id=task_id)
+ ti = dag_maker.create_dagrun().get_task_instance(task.task_id)
+
+ executor = MockExecutor(do_update=False)
+ scheduler_job = Job(executor=executor)
+ self.job_runner = SchedulerJobRunner(scheduler_job)
+ mock_stats_incr.reset_mock()
+
+ ti.state = State.QUEUED
+ ti.try_number = 2
+ session.merge(ti)
+ session.commit()
+
+ executor.event_buffer[ti.key.with_try_number(1)] = State.RUNNING,
"first_executor_id"
+ executor.event_buffer[ti.key.with_try_number(2)] = State.RUNNING,
"second_executor_id"
+
+ with caplog.at_level(logging.WARNING,
logger="airflow.jobs.scheduler_job_runner"):
+ self.job_runner._process_executor_events(executor=executor,
session=session)
+
+ assert any(
+ "Multiple executor events for same TI with different try_numbers!"
in rec.message
+ for rec in caplog.records
+ )
+ mock_task_callback.assert_not_called()
+ mock_stats_incr.assert_not_called()
+
@pytest.mark.usefixtures("testing_dag_bundle")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_events_with_asset_events(self, mock_stats_incr,
session, dag_maker):
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index 42d6af6bad3..0194aa059f4 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -2095,6 +2095,84 @@ def
test_schedule_tis_empty_operator_try_number(dag_maker, session: Session):
assert empty_ti.try_number == 1
+def test_schedule_tis_try_number_mismatch_logs_warning(dag_maker, session:
Session, monkeypatch):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ original_execute = session.execute
+
+ class _FakeSelectResult:
+ def all(self):
+ return [(ti.id, ti.try_number + 2, TaskInstanceState.SCHEDULED)]
+
+ def execute_with_mismatch(statement, *args, **kwargs):
+ if getattr(statement, "is_select", False):
+ return _FakeSelectResult()
+ return original_execute(statement, *args, **kwargs)
+
+ monkeypatch.setattr(session, "execute", execute_with_mismatch)
+
+ with (
+ mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+ mock.patch.object(dr.log, "warning") as warning_mock,
+ ):
+ dr.schedule_tis((ti,), session=session)
+
+ assert any(
+ "schedule_tis: try_number mismatch after scheduling" in call.args[0]
+ for call in warning_mock.call_args_list
+ )
+
+
+def test_schedule_tis_try_number_match_has_no_warning(dag_maker, session:
Session):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ with (
+ mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+ mock.patch.object(dr.log, "warning") as warning_mock,
+ ):
+ dr.schedule_tis((ti,), session=session)
+
+ assert all(
+ "schedule_tis: try_number mismatch after scheduling" not in
call.args[0]
+ for call in warning_mock.call_args_list
+ )
+
+
+def test_schedule_tis_try_number_check_is_debug_only(dag_maker, session:
Session, monkeypatch):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ original_execute = session.execute
+ select_calls = 0
+
+ def execute_with_counter(statement, *args, **kwargs):
+ nonlocal select_calls
+ if getattr(statement, "is_select", False):
+ select_calls += 1
+ return original_execute(statement, *args, **kwargs)
+
+ monkeypatch.setattr(session, "execute", execute_with_counter)
+
+ with mock.patch.object(dr.log, "isEnabledFor", return_value=False):
+ dr.schedule_tis((ti,), session=session)
+
+ assert select_calls == 0
+
+
@pytest.mark.xfail(reason="We can't keep this behaviour with remote workers
where scheduler can't reach xcom")
def test_schedule_tis_start_trigger_through_expand(dag_maker, session):
"""