This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 5a2b2404cfe [v3-1-test] Add guardrail to handle DAG deserialization 
errors in scheduler (#61162) (#61210)
5a2b2404cfe is described below

commit 5a2b2404cfe21d912b801925210fb507d0412bf4
Author: Rahul Vats <[email protected]>
AuthorDate: Thu Jan 29 22:09:12 2026 +0530

    [v3-1-test] Add guardrail to handle DAG deserialization errors in scheduler 
(#61162) (#61210)
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 23 ++++++++-------
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 33 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 10 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 237aacde975..13729013418 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -136,14 +136,6 @@ def _eager_load_dag_run_for_validation() -> tuple[Load, 
Load]:
     )
 
 
-def _get_current_dag(dag_id: str, session: Session) -> SerializedDAG | None:
-    serdag = SerializedDagModel.get(dag_id=dag_id, session=session)  # grabs 
the latest version
-    if not serdag:
-        return None
-    serdag.load_op_links = False
-    return serdag.dag
-
-
 class ConcurrencyMap:
     """
     Dataclass to represent concurrency maps.
@@ -248,6 +240,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
     def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
         Stats.incr("scheduler_heartbeat", 1, 1)
 
+    def _get_current_dag(self, dag_id: str, session: Session) -> SerializedDAG 
| None:
+        try:
+            serdag = SerializedDagModel.get(dag_id=dag_id, session=session)
+            if not serdag:
+                return None
+            serdag.load_op_links = False
+            return serdag.dag
+        except Exception:
+            self.log.exception("Failed to deserialize DAG '%s'", dag_id)
+            return None
+
     def register_signals(self) -> ExitStack:
         """Register signals that stop child processes."""
         resetter = ExitStack()
@@ -1601,7 +1604,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         )
 
         for dag_model in dag_models:
-            dag = _get_current_dag(dag_id=dag_model.dag_id, session=session)
+            dag = self._get_current_dag(dag_id=dag_model.dag_id, 
session=session)
             if not dag:
                 self.log.error("DAG '%s' not found in serialized_dag table", 
dag_model.dag_id)
                 continue
@@ -1664,7 +1667,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         }
 
         for dag_model in dag_models:
-            dag = _get_current_dag(dag_id=dag_model.dag_id, session=session)
+            dag = self._get_current_dag(dag_id=dag_model.dag_id, 
session=session)
             if not dag:
                 self.log.error("DAG '%s' not found in serialized_dag table", 
dag_model.dag_id)
                 continue
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index aba109f9d41..430833fb1ca 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -4752,6 +4752,39 @@ class TestSchedulerJob:
             session.delete(sdm)
         session.commit()
 
+    def 
test_scheduler_create_dag_runs_does_not_crash_on_deserialization_error(self, 
caplog, dag_maker):
+        """
+        Test that scheduler._create_dag_runs does not crash when DAG 
deserialization fails.
+        This is a guardrail to ensure the scheduler continues processing other 
DAGs even if
+        one DAG has a deserialization error.
+        """
+        with 
dag_maker(dag_id="test_scheduler_create_dag_runs_deserialization_error"):
+            EmptyOperator(task_id="dummy")
+
+        scheduler_job = Job(executor=self.null_exec)
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        caplog.set_level("FATAL")
+        caplog.clear()
+        with (
+            create_session() as session,
+            caplog.at_level(
+                "ERROR",
+                logger="airflow.jobs.scheduler_job_runner",
+            ),
+            patch(
+                "airflow.models.serialized_dag.SerializedDagModel.get",
+                side_effect=Exception("Simulated deserialization error"),
+            ),
+        ):
+            self.job_runner._create_dag_runs([dag_maker.dag_model], session)
+            scheduler_messages = [
+                record.message for record in caplog.records if record.levelno 
>= logging.ERROR
+            ]
+            assert any("Failed to deserialize DAG" in msg for msg in 
scheduler_messages), (
+                f"Expected deserialization error log, got: 
{scheduler_messages}"
+            )
+
     def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, 
dag_maker, testing_dag_bundle):
         """
         Test that externally triggered Dag Runs should not affect (by 
skipping) next

Reply via email to