jedcunningham commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r747009307
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,26 +913,23 @@ def _create_dag_runs(self, dag_models:
Collection[DagModel], session: Session) -
creating_job_id=self.id,
)
active_runs_of_dags[dag.dag_id] += 1
- self._update_dag_next_dagruns(dag, dag_model,
active_runs_of_dags[dag.dag_id])
+ if self._should_update_dag_next_dagruns(dag, dag_model,
active_runs_of_dags[dag.dag_id]):
Review comment:
e.g:
```
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -913,11 +913,11 @@ class SchedulerJob(BaseJob):
creating_job_id=self.id,
)
active_runs_of_dags[dag.dag_id] += 1
- self._update_dag_next_dagruns(dag, dag_model,
active_runs_of_dags[dag.dag_id])
+ self._update_dag_next_dagruns(dag, dag_model,
active_runs_of_dags[dag.dag_id], data_interval)
# TODO[HA]: Should we do a session.flush() so we don't have to keep
lots of state/object in
# memory for larger dags? or expunge_all()
- def _update_dag_next_dagruns(self, dag, dag_model: DagModel,
total_active_runs) -> None:
+ def _update_dag_next_dagruns(self, dag, dag_model: DagModel,
total_active_runs, data_interval = None) -> None:
"""
Update the next_dagrun, next_dagrun_data_interval_start/end
and next_dagrun_create_after for this dag.
@@ -931,7 +931,8 @@ class SchedulerJob(BaseJob):
)
dag_model.next_dagrun_create_after = None
else:
- data_interval = dag.get_next_data_interval(dag_model)
+ if not data_interval:
+ data_interval = dag.get_next_data_interval(dag_model)
dag_model.calculate_dagrun_date_fields(dag, data_interval)
```
Probably safer with a sentinel instead of None, but I think it'd be okay.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]