This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 98618f0c4f Fix Scheduler crash looping when dagrun creation fails
(#35135)
98618f0c4f is described below
commit 98618f0c4f1dfd4718741291c29d4adb6f0c0140
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Oct 23 19:39:37 2023 +0100
Fix Scheduler crash looping when dagrun creation fails (#35135)
---
airflow/jobs/scheduler_job_runner.py | 30 +++++++++++++++++++-----------
tests/jobs/test_scheduler_job.py | 25 +++++++++++++++++++++++++
2 files changed, 44 insertions(+), 11 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 41c8714b49..e05ad770a4 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1176,17 +1176,25 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# create a new one. This is so that in the next Scheduling loop we
try to create new runs
# instead of falling in a loop of Integrity Error.
if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
- dag.create_dagrun(
- run_type=DagRunType.SCHEDULED,
- execution_date=dag_model.next_dagrun,
- state=DagRunState.QUEUED,
- data_interval=data_interval,
- external_trigger=False,
- session=session,
- dag_hash=dag_hash,
- creating_job_id=self.job.id,
- )
- active_runs_of_dags[dag.dag_id] += 1
+ try:
+ dag.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=dag_model.next_dagrun,
+ state=DagRunState.QUEUED,
+ data_interval=data_interval,
+ external_trigger=False,
+ session=session,
+ dag_hash=dag_hash,
+ creating_job_id=self.job.id,
+ )
+ active_runs_of_dags[dag.dag_id] += 1
+ # Exceptions like ValueError, ParamValidationError, etc. are
raised by
+ # dag.create_dagrun() when dag is misconfigured. The scheduler
should not
+ # crash due to misconfigured dags. We should log any exception
encountered
+ # and continue to the next dag.
+ except Exception:
+ self.log.exception("Failed creating DagRun for %s",
dag.dag_id)
+ continue
if self._should_update_dag_next_dagruns(
dag,
dag_model,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 6f3085bbe2..5e83c77a64 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -5117,6 +5117,31 @@ class TestSchedulerJob:
]
assert orphaned_datasets == ["ds2", "ds4"]
+ def test_misconfigured_dags_doesnt_crash_scheduler(self, session,
dag_maker, caplog):
+ """Test that if dagrun creation throws an exception, the scheduler
doesn't crash"""
+
+ with dag_maker("testdag1", serialized=True):
+ BashOperator(task_id="task", bash_command="echo 1")
+
+ dm1 = dag_maker.dag_model
+ # Here, the next_dagrun is set to None, which will cause an exception
+ dm1.next_dagrun = None
+ session.add(dm1)
+ session.flush()
+
+ with dag_maker("testdag2", serialized=True):
+ BashOperator(task_id="task", bash_command="echo 1")
+ dm2 = dag_maker.dag_model
+
+ scheduler_job = Job()
+ job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
+ # In the dagmodel list, the first dag should fail, but the second one
should succeed
+ job_runner._create_dag_runs([dm1, dm2], session)
+ assert "Failed creating DagRun for testdag1" in caplog.text
+ assert not DagRun.find(dag_id="testdag1", session=session)
+ # Check if the second dagrun was created
+ assert DagRun.find(dag_id="testdag2", session=session)
+
@pytest.mark.need_serialized_dag
def test_schedule_dag_run_with_upstream_skip(dag_maker, session):