YoannAbriel commented on code in PR #62878:
URL: https://github.com/apache/airflow/pull/62878#discussion_r3035373161
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -1822,10 +1828,53 @@ def
test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32,
session=session)
session.flush()
+ # No tasks should be queued
assert len(res) == 0
+ # Tasks should remain SCHEDULED (not be bulk-failed)
tis = dr.get_task_instances(session=session)
assert len(tis) == 2
- assert all(ti.state == State.FAILED for ti in tis)
+ assert all(ti.state == State.SCHEDULED for ti in tis)
+
+ def test_missing_serialized_dag_does_not_bulk_fail_tasks(self, dag_maker,
session):
Review Comment:
Removed the duplicate test — the existing one covers this.
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -1822,10 +1828,53 @@ def
test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32,
session=session)
session.flush()
+ # No tasks should be queued
assert len(res) == 0
+ # Tasks should remain SCHEDULED (not be bulk-failed)
tis = dr.get_task_instances(session=session)
assert len(tis) == 2
- assert all(ti.state == State.FAILED for ti in tis)
+ assert all(ti.state == State.SCHEDULED for ti in tis)
+
+ def test_missing_serialized_dag_does_not_bulk_fail_tasks(self, dag_maker,
session):
+ """Regression test for https://github.com/apache/airflow/issues/62050
+
+ When the serialized DAG is transiently missing, all SCHEDULED task
instances for the DAG
+ should remain SCHEDULED so the scheduler can pick them up in the next
iteration.
+ Previously, the scheduler would bulk-fail all SCHEDULED tasks when it
couldn't find the
+ serialized DAG in the DagBag.
+ """
+ dag_id = "SchedulerJobTest.test_missing_serialized_dag_bulk_fails"
+
+ with dag_maker(dag_id=dag_id, session=session,
default_args={"max_active_tis_per_dag": 2}):
+ EmptyOperator(task_id="task_a")
+ EmptyOperator(task_id="task_b")
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ # Simulate serialized DAG being transiently missing
+ self.job_runner.scheduler_dag_bag = mock.MagicMock()
Review Comment:
Switched to spec=DBDagBag.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]