kaxil commented on code in PR #62878:
URL: https://github.com/apache/airflow/pull/62878#discussion_r2934018871
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -1822,10 +1828,53 @@ def
test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32,
session=session)
session.flush()
+ # No tasks should be queued
assert len(res) == 0
+ # Tasks should remain SCHEDULED (not be bulk-failed)
tis = dr.get_task_instances(session=session)
assert len(tis) == 2
- assert all(ti.state == State.FAILED for ti in tis)
+ assert all(ti.state == State.SCHEDULED for ti in tis)
+
+ def test_missing_serialized_dag_does_not_bulk_fail_tasks(self, dag_maker,
session):
Review Comment:
This test covers the same scenario as
`test_queued_task_instances_skips_with_missing_dag` above (mock
`get_dag_for_run` to return None, assert tasks stay SCHEDULED). The only
differences are the dag_id string, `max_active_tis_per_dag` value (1 vs 2), and
the assertion style. I'd pick one or the other rather than keeping both.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -770,19 +770,17 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
serialized_dag = self.scheduler_dag_bag.get_dag_for_run(
dag_run=task_instance.dag_run, session=session
)
- # If the dag is missing, fail the task and continue to the
next task.
+ # If the dag is transiently missing, skip scheduling it
this iteration
+ # and try again next time instead of bulk-failing all
scheduled tasks.
+ # See: https://github.com/apache/airflow/issues/62050
if not serialized_dag:
- self.log.error(
- "DAG '%s' for task instance %s not found in
serialized_dag table",
+ self.log.warning(
+ "DAG '%s' for task instance %s not found in
serialized_dag table, "
+ "skipping scheduling for this iteration and will
retry next time",
dag_id,
task_instance,
)
- session.execute(
- update(TI)
- .where(TI.dag_id == dag_id, TI.state ==
TaskInstanceState.SCHEDULED)
- .values(state=TaskInstanceState.FAILED)
- .execution_options(synchronize_session="fetch")
- )
+ starved_dags.add(dag_id)
Review Comment:
If a DAG is permanently deleted (not just transiently missing), tasks will
stay SCHEDULED forever and this warning will fire every scheduler iteration.
Would it be worth tracking consecutive misses per `dag_id` and escalating to
failure after N iterations? The issue thread mentioned this approach too. At
minimum, this should probably be documented as a known limitation in the PR
description so follow-up work can address it.
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -1822,10 +1828,53 @@ def
test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32,
session=session)
session.flush()
+ # No tasks should be queued
assert len(res) == 0
+ # Tasks should remain SCHEDULED (not be bulk-failed)
tis = dr.get_task_instances(session=session)
assert len(tis) == 2
- assert all(ti.state == State.FAILED for ti in tis)
+ assert all(ti.state == State.SCHEDULED for ti in tis)
+
+ def test_missing_serialized_dag_does_not_bulk_fail_tasks(self, dag_maker,
session):
+ """Regression test for https://github.com/apache/airflow/issues/62050
+
+ When the serialized DAG is transiently missing, all SCHEDULED task
instances for the DAG
+ should remain SCHEDULED so the scheduler can pick them up in the next
iteration.
+ Previously, the scheduler would bulk-fail all SCHEDULED tasks when it
couldn't find the
+ serialized DAG in the DagBag.
+ """
+ dag_id = "SchedulerJobTest.test_missing_serialized_dag_bulk_fails"
+
+ with dag_maker(dag_id=dag_id, session=session,
default_args={"max_active_tis_per_dag": 2}):
+ EmptyOperator(task_id="task_a")
+ EmptyOperator(task_id="task_b")
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ # Simulate serialized DAG being transiently missing
+ self.job_runner.scheduler_dag_bag = mock.MagicMock()
Review Comment:
`mock.MagicMock()` without `spec` won't catch typos if someone later renames
`get_dag_for_run`. Consider `mock.MagicMock(spec=DBDagBag)` (same applies to
the existing test on line 1813, but that's pre-existing).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]