This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 15aeed9a8ae Rename dag to serdag where appropriate in scheduler
(#59118)
15aeed9a8ae is described below
commit 15aeed9a8aeafae0a6a6bbc5355f1f52dc30c2c9
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Dec 5 20:59:45 2025 -0800
Rename dag to serdag where appropriate in scheduler (#59118)
---
.../src/airflow/jobs/scheduler_job_runner.py | 36 +++++++++++-----------
1 file changed, 18 insertions(+), 18 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 2ae008026c7..a7fd0049810 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1761,7 +1761,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# Bulk Fetch DagRuns with dag_id and logical_date same
# as DagModel.dag_id and DagModel.next_dagrun
# This list is used to verify if the DagRun already exist so that we
don't attempt to create
- # duplicate dag runs
+ # duplicate DagRuns
existing_dagruns = (
session.execute(
select(DagRun.dag_id, DagRun.logical_date).where(
@@ -1786,25 +1786,25 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
for dag_model in dag_models:
- dag = _get_current_dag(dag_id=dag_model.dag_id, session=session)
- if not dag:
+ serdag = _get_current_dag(dag_id=dag_model.dag_id, session=session)
+ if not serdag:
self.log.error("DAG '%s' not found in serialized_dag table",
dag_model.dag_id)
continue
- data_interval = get_next_data_interval(dag.timetable, dag_model)
+ data_interval = get_next_data_interval(serdag.timetable, dag_model)
# Explicitly check if the DagRun already exists. This is an edge
case
# where a Dag Run is created but `DagModel.next_dagrun` and
`DagModel.next_dagrun_create_after`
# are not updated.
# We opted to check DagRun existence instead
# of catching an Integrity error and rolling back the session i.e
- # we need to set dag.next_dagrun_info if the Dag Run already
exists or if we
- # create a new one. This is so that in the next Scheduling loop we
try to create new runs
- # instead of falling in a loop of Integrity Error.
- if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+ # we need to set DagModel.next_dagrun_info if the DagRun already
exists or if we
+ # create a new one. This is so that in the next scheduling loop we
try to create new runs
+ # instead of falling in a loop of IntegrityError.
+ if (serdag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
try:
if dag_model.next_dagrun is not None and
dag_model.next_dagrun_create_after is not None:
- dag.create_dagrun(
- run_id=dag.timetable.generate_run_id(
+ serdag.create_dagrun(
+ run_id=serdag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
run_after=timezone.coerce_datetime(dag_model.next_dagrun),
data_interval=data_interval,
@@ -1818,28 +1818,28 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
creating_job_id=self.job.id,
session=session,
)
- active_runs_of_dags[dag.dag_id] += 1
+ active_runs_of_dags[serdag.dag_id] += 1
else:
if dag_model.next_dagrun is None:
raise ValueError("dag_model.next_dagrun is None;
expected datetime")
raise ValueError("dag_model.next_dagrun_create_after
is None; expected datetime")
# Exceptions like ValueError, ParamValidationError, etc. are
raised by
- # dag.create_dagrun() when dag is misconfigured. The scheduler
should not
+ # DagModel.create_dagrun() when dag is misconfigured. The
scheduler should not
# crash due to misconfigured dags. We should log any exception
encountered
- # and continue to the next dag.
+ # and continue to the next serdag.
except Exception:
- self.log.exception("Failed creating DagRun for %s",
dag.dag_id)
+ self.log.exception("Failed creating DagRun for %s",
serdag.dag_id)
continue
if self._should_update_dag_next_dagruns(
- dag,
+ serdag,
dag_model,
last_dag_run=None,
- active_non_backfill_runs=active_runs_of_dags[dag.dag_id],
+ active_non_backfill_runs=active_runs_of_dags[serdag.dag_id],
session=session,
):
- dag_model.calculate_dagrun_date_fields(dag, data_interval)
+ dag_model.calculate_dagrun_date_fields(serdag, data_interval)
# TODO[HA]: Should we do a session.flush() so we don't have to keep
lots of state/object in
- # memory for larger dags? or expunge_all()
+ # memory for larger dags? or expunge_all()
def _create_dag_runs_asset_triggered(
self,