This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c83341880da512a0f7d403def769a2a2dab4f1c6
Author: Kalyan R <[email protected]>
AuthorDate: Wed Sep 17 07:46:58 2025 +0530

    fix unintended print of dag not found in serialised_dag table (#54972)
    
    * fix
    
    * raise exception
    
    (cherry picked from commit eec40ebefeb25e0c7fbd794eef4a16381acab78b)
---
 airflow-core/src/airflow/jobs/scheduler_job_runner.py | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 95c4395e537..e718411c1bd 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -48,6 +48,7 @@ from airflow.callbacks.callback_requests import (
 )
 from airflow.configuration import conf
 from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
+from airflow.exceptions import DagNotFound
 from airflow.executors import workloads
 from airflow.jobs.base_job_runner import BaseJobRunner
 from airflow.jobs.job import Job, JobState, perform_heartbeat
@@ -908,13 +909,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 # Get task from the Serialized DAG
                 try:
                     dag = 
scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session)
-                    cls.logger().error(
-                        "DAG '%s' for task instance %s not found in 
serialized_dag table",
-                        ti.dag_id,
-                        ti,
-                    )
-                    if TYPE_CHECKING:
-                        assert dag
+                    if not dag:
+                        cls.logger().error(
+                            "DAG '%s' for task instance %s not found in 
serialized_dag table",
+                            ti.dag_id,
+                            ti,
+                        )
+                        raise DagNotFound(f"DAG '{ti.dag_id}' not found in 
serialized_dag table")
+
                     task = dag.get_task(ti.task_id)
                 except Exception:
                     cls.logger().exception("Marking task instance %s as %s", 
ti, state)

Reply via email to