This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 15aeed9a8ae Rename dag to serdag where appropriate in scheduler 
(#59118)
15aeed9a8ae is described below

commit 15aeed9a8aeafae0a6a6bbc5355f1f52dc30c2c9
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Dec 5 20:59:45 2025 -0800

    Rename dag to serdag where appropriate in scheduler (#59118)
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 36 +++++++++++-----------
 1 file changed, 18 insertions(+), 18 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 2ae008026c7..a7fd0049810 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1761,7 +1761,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         # Bulk Fetch DagRuns with dag_id and logical_date same
         # as DagModel.dag_id and DagModel.next_dagrun
         # This list is used to verify if the DagRun already exist so that we 
don't attempt to create
-        # duplicate dag runs
+        # duplicate DagRuns
         existing_dagruns = (
             session.execute(
                 select(DagRun.dag_id, DagRun.logical_date).where(
@@ -1786,25 +1786,25 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         )
 
         for dag_model in dag_models:
-            dag = _get_current_dag(dag_id=dag_model.dag_id, session=session)
-            if not dag:
+            serdag = _get_current_dag(dag_id=dag_model.dag_id, session=session)
+            if not serdag:
                 self.log.error("DAG '%s' not found in serialized_dag table", 
dag_model.dag_id)
                 continue
 
-            data_interval = get_next_data_interval(dag.timetable, dag_model)
+            data_interval = get_next_data_interval(serdag.timetable, dag_model)
             # Explicitly check if the DagRun already exists. This is an edge 
case
             # where a Dag Run is created but `DagModel.next_dagrun` and 
`DagModel.next_dagrun_create_after`
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to set dag.next_dagrun_info if the Dag Run already 
exists or if we
-            # create a new one. This is so that in the next Scheduling loop we 
try to create new runs
-            # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+            # we need to set DagModel.next_dagrun_info if the DagRun already 
exists or if we
+            # create a new one. This is so that in the next scheduling loop we 
try to create new runs
+            # instead of falling in a loop of IntegrityError.
+            if (serdag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
                 try:
                     if dag_model.next_dagrun is not None and 
dag_model.next_dagrun_create_after is not None:
-                        dag.create_dagrun(
-                            run_id=dag.timetable.generate_run_id(
+                        serdag.create_dagrun(
+                            run_id=serdag.timetable.generate_run_id(
                                 run_type=DagRunType.SCHEDULED,
                                 
run_after=timezone.coerce_datetime(dag_model.next_dagrun),
                                 data_interval=data_interval,
@@ -1818,28 +1818,28 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                             creating_job_id=self.job.id,
                             session=session,
                         )
-                        active_runs_of_dags[dag.dag_id] += 1
+                        active_runs_of_dags[serdag.dag_id] += 1
                     else:
                         if dag_model.next_dagrun is None:
                             raise ValueError("dag_model.next_dagrun is None; 
expected datetime")
                         raise ValueError("dag_model.next_dagrun_create_after 
is None; expected datetime")
                 # Exceptions like ValueError, ParamValidationError, etc. are 
raised by
-                # dag.create_dagrun() when dag is misconfigured. The scheduler 
should not
+                # DagModel.create_dagrun() when dag is misconfigured. The 
scheduler should not
                 # crash due to misconfigured dags. We should log any exception 
encountered
-                # and continue to the next dag.
+                # and continue to the next serdag.
                 except Exception:
-                    self.log.exception("Failed creating DagRun for %s", 
dag.dag_id)
+                    self.log.exception("Failed creating DagRun for %s", 
serdag.dag_id)
                     continue
             if self._should_update_dag_next_dagruns(
-                dag,
+                serdag,
                 dag_model,
                 last_dag_run=None,
-                active_non_backfill_runs=active_runs_of_dags[dag.dag_id],
+                active_non_backfill_runs=active_runs_of_dags[serdag.dag_id],
                 session=session,
             ):
-                dag_model.calculate_dagrun_date_fields(dag, data_interval)
+                dag_model.calculate_dagrun_date_fields(serdag, data_interval)
         # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
-        # memory for larger dags? or expunge_all()
+        #  memory for larger dags? or expunge_all()
 
     def _create_dag_runs_asset_triggered(
         self,

Reply via email to