This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c83341880da512a0f7d403def769a2a2dab4f1c6 Author: Kalyan R <[email protected]> AuthorDate: Wed Sep 17 07:46:58 2025 +0530 fix unintended print of dag not found in serialised_dag table (#54972) * fix * raise exception (cherry picked from commit eec40ebefeb25e0c7fbd794eef4a16381acab78b) --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 95c4395e537..e718411c1bd 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -48,6 +48,7 @@ from airflow.callbacks.callback_requests import ( ) from airflow.configuration import conf from airflow.dag_processing.bundles.base import BundleUsageTrackingManager +from airflow.exceptions import DagNotFound from airflow.executors import workloads from airflow.jobs.base_job_runner import BaseJobRunner from airflow.jobs.job import Job, JobState, perform_heartbeat @@ -908,13 +909,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): # Get task from the Serialized DAG try: dag = scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session) - cls.logger().error( - "DAG '%s' for task instance %s not found in serialized_dag table", - ti.dag_id, - ti, - ) - if TYPE_CHECKING: - assert dag + if not dag: + cls.logger().error( + "DAG '%s' for task instance %s not found in serialized_dag table", + ti.dag_id, + ti, + ) + raise DagNotFound(f"DAG '{ti.dag_id}' not found in serialized_dag table") + task = dag.get_task(ti.task_id) except Exception: cls.logger().exception("Marking task instance %s as %s", ti, state)
