ferruzzi commented on code in PR #68359:
URL: https://github.com/apache/airflow/pull/68359#discussion_r3444260561
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2374,8 +2385,66 @@ def _mark_backfills_complete(self, *, session: Session =
NEW_SESSION) -> None:
for b in backfills:
b.completed_at = now
- def _create_dag_runs(self, dag_models: Collection[DagModel], session:
Session) -> None:
- """Create a DAG run and update the dag_model to control if/when the
next DAGRun should be created."""
+ def _collect_skipped_intervals(
+ self,
+ serdag: SerializedDAG,
+ new_data_interval: DataInterval,
+ session: Session,
+ ) -> list[tuple[DateTime, DateTime]]:
+ """
+ Return a list of (start, end) tuples for intervals skipped due to
catchup=False.
+
+ Computes the intervals that would have been scheduled between the
previous
+ automated DagRun's data_interval_end and the new run's
data_interval_start,
+ had catchup been True. Returns an empty list when there is no gap or
when
+ no previous run exists.
+ """
+ if serdag.catchup:
+ return []
+ listener_has_impls = bool(
+ get_listener_manager().hook.on_intervals_skipped.get_hookimpls()
# type: ignore[attr-defined]
+ )
+ if not serdag.has_on_skipped_intervals_callback and not
listener_has_impls:
+ return []
+
+ prev_run = session.scalar(
+ select(DagRun)
+ .where(
+ DagRun.dag_id == serdag.dag_id,
+ DagRun.run_type.in_([DagRunType.SCHEDULED]),
+ DagRun.data_interval_end.is_not(None),
+ DagRun.data_interval_end <= new_data_interval.start,
+ )
+ .order_by(DagRun.data_interval_end.desc())
+ .limit(1)
+ )
+ if prev_run is None or prev_run.data_interval_end is None:
+ return []
+
+ prev_end = prev_run.data_interval_end
+ new_start = new_data_interval.start
+ if prev_end >= new_start:
+ return []
+
+ skipped: list[tuple[DateTime, DateTime]] = []
+ for info in serdag.iter_dagrun_infos_between(prev_end, new_start):
Review Comment:
I think option 2 sounds like the cleanest way to get the same result. As
you said, the caller can always use the start and end to get the rest. Let's
follow the "do one thing and do it right" paradigm, I think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]