ferruzzi commented on code in PR #68359:
URL: https://github.com/apache/airflow/pull/68359#discussion_r3431193632
##########
airflow-core/docs/administration-and-deployment/logging-monitoring/callbacks.rst:
##########
@@ -95,6 +98,46 @@ For example, a timeout may be caused by a number of stalling
tasks, but only one
Before Airflow 3.2.0, the rules above did not apply and the task instance
passed to Dag callback was not related to Dag state, rather being selected as
the latest task in the Dag
lexicographically.
+Skipped Intervals Callback
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When a Dag is defined with ``catchup=False``, the scheduler may advance past
missed scheduled
+intervals without creating Dag runs for them. Set
``on_skipped_intervals_callback`` on the Dag
+to be notified when that happens.
+
Review Comment:
If the Dag is paused, would this trigger the callback to fire on every
scheduler pass (ever 5 seconds)? Maybe it needs some kind of throttling so it
doesn't hammer the callback.
##########
airflow-core/src/airflow/dag_processing/processor.py:
##########
@@ -389,6 +392,38 @@ def _execute_dag_callbacks(dagbag: DagBag, request:
DagCallbackRequest, log: Fil
stats.incr("dag.callback_exceptions", tags={"dag_id":
request.dag_id})
+def _execute_dag_skipped_intervals_callback(
+ dagbag: DagBag, request: DagSkippedIntervalsCallbackRequest, log:
FilteringBoundLogger
+) -> None:
+ from airflow._shared.timezones import timezone
+ from airflow.timetables.base import DataInterval
+
+ dag, _ = _get_dag_with_task(dagbag, request.dag_id)
+ callbacks = dag.on_skipped_intervals_callback
+ if not callbacks:
+ log.warning("Skipped intervals callback requested, but dag didn't have
any", dag_id=request.dag_id)
+ return
+
+ callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
+ skipped_intervals = [
+ DataInterval(start=timezone.coerce_datetime(start),
end=timezone.coerce_datetime(end))
+ for start, end in request.skipped_intervals
+ ]
+ context: Context = { # type: ignore[typeddict-unknown-key]
+ "dag": dag,
+ "reason": "skipped_intervals",
+ "skipped_intervals": skipped_intervals,
+ }
Review Comment:
Any time you need to add an `ignore` should be a red flag. I'm not saying
it's never the right answer, but it very rarely is, and it's usually masking a
bigger issue. In this case, it looks like you are `type`ing it as a `Context`
just to make MyPy happy, but it's clearly not an actual `Context`.
The most obvious answer would be to make a new type (`SkippedContext`?) and
expand the accepted types where this gets sent.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2374,8 +2385,66 @@ def _mark_backfills_complete(self, *, session: Session =
NEW_SESSION) -> None:
for b in backfills:
b.completed_at = now
- def _create_dag_runs(self, dag_models: Collection[DagModel], session:
Session) -> None:
- """Create a DAG run and update the dag_model to control if/when the
next DAGRun should be created."""
+ def _collect_skipped_intervals(
+ self,
+ serdag: SerializedDAG,
+ new_data_interval: DataInterval,
+ session: Session,
+ ) -> list[tuple[DateTime, DateTime]]:
+ """
+ Return a list of (start, end) tuples for intervals skipped due to
catchup=False.
+
+ Computes the intervals that would have been scheduled between the
previous
+ automated DagRun's data_interval_end and the new run's
data_interval_start,
+ had catchup been True. Returns an empty list when there is no gap or
when
+ no previous run exists.
+ """
+ if serdag.catchup:
+ return []
+ listener_has_impls = bool(
+ get_listener_manager().hook.on_intervals_skipped.get_hookimpls()
# type: ignore[attr-defined]
+ )
+ if not serdag.has_on_skipped_intervals_callback and not
listener_has_impls:
+ return []
+
+ prev_run = session.scalar(
+ select(DagRun)
+ .where(
+ DagRun.dag_id == serdag.dag_id,
+ DagRun.run_type.in_([DagRunType.SCHEDULED]),
+ DagRun.data_interval_end.is_not(None),
+ DagRun.data_interval_end <= new_data_interval.start,
+ )
+ .order_by(DagRun.data_interval_end.desc())
+ .limit(1)
+ )
+ if prev_run is None or prev_run.data_interval_end is None:
+ return []
+
+ prev_end = prev_run.data_interval_end
+ new_start = new_data_interval.start
+ if prev_end >= new_start:
+ return []
+
+ skipped: list[tuple[DateTime, DateTime]] = []
+ for info in serdag.iter_dagrun_infos_between(prev_end, new_start):
Review Comment:
If a Dag was scheduled to run every day and was paused for a week,
`iter_dagrun_infos_between` is going to return seven skips. If it's scheduled
more often and/or paused longer, this is going to blow up real fast. Do we
actually want to send a list with every miss to the callback or can we make
this more efficient?
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2505,6 +2574,34 @@ def _create_dag_runs(self, dag_models:
Collection[DagModel], session: Session) -
active_non_backfill_runs=active_runs_of_dags[dag_model.dag_id],
)
+ if data_interval is not None:
+ skipped = self._collect_skipped_intervals(
+ serdag=serdag,
+ new_data_interval=data_interval,
+ session=session,
+ )
+ if skipped:
+ try:
+ get_listener_manager().hook.on_intervals_skipped(
# type: ignore[attr-defined]
+ dag_id=serdag.dag_id,
+ skipped_intervals=[DataInterval(start=s,
end=e) for s, e in skipped],
+ )
+ except Exception:
+ self.log.exception(
+ "Error notifying listener of skipped
intervals",
+ dag_id=serdag.dag_id,
+ )
Review Comment:
This actually may eventually be an issue. It's following the existing
patterns so not really your problem, but this is adding more blocking work on
the scheduler. We try very hard to limit any blocking processes on the
scheduler since that creates a bottleneck in the system. We may eventually
want to figure out how to move the "should this dag callback fire?" logic off
somewhere else.
No action required, feel free to resolve this once you've read it and
understand the concern.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2374,8 +2385,66 @@ def _mark_backfills_complete(self, *, session: Session =
NEW_SESSION) -> None:
for b in backfills:
b.completed_at = now
- def _create_dag_runs(self, dag_models: Collection[DagModel], session:
Session) -> None:
- """Create a DAG run and update the dag_model to control if/when the
next DAGRun should be created."""
+ def _collect_skipped_intervals(
+ self,
+ serdag: SerializedDAG,
+ new_data_interval: DataInterval,
+ session: Session,
+ ) -> list[tuple[DateTime, DateTime]]:
+ """
+ Return a list of (start, end) tuples for intervals skipped due to
catchup=False.
+
+ Computes the intervals that would have been scheduled between the
previous
+ automated DagRun's data_interval_end and the new run's
data_interval_start,
+ had catchup been True. Returns an empty list when there is no gap or
when
+ no previous run exists.
+ """
+ if serdag.catchup:
+ return []
+ listener_has_impls = bool(
+ get_listener_manager().hook.on_intervals_skipped.get_hookimpls()
# type: ignore[attr-defined]
+ )
+ if not serdag.has_on_skipped_intervals_callback and not
listener_has_impls:
+ return []
+
+ prev_run = session.scalar(
+ select(DagRun)
+ .where(
+ DagRun.dag_id == serdag.dag_id,
+ DagRun.run_type.in_([DagRunType.SCHEDULED]),
Review Comment:
Style nitpick:
```suggestion
DagRun.run_type == DagRunType.SCHEDULED,,
```
##########
task-sdk/src/airflow/sdk/execution_time/schema/versions/__init__.py:
##########
@@ -19,7 +19,10 @@
from cadwyn import HeadVersion, Version, VersionBundle
+from airflow.sdk.execution_time.schema.versions.v2026_06_16 import
AddDagSkippedIntervalsCallbackRequest
+
bundle = VersionBundle(
Review Comment:
Just a heads up that this is going to be a friction point when you rebase.
Nothing you can do about it, just a warning.
##########
airflow-core/src/airflow/callbacks/callback_requests.py:
##########
@@ -172,8 +173,17 @@ class DagCallbackRequest(BaseCallbackRequest):
type: Literal["DagCallbackRequest"] = "DagCallbackRequest"
+class DagSkippedIntervalsCallbackRequest(BaseCallbackRequest):
+ """A Class with information about the skipped intervals DAG callback to be
executed."""
Review Comment:
Convention is to use imperative tone for class docstrings. In other words,
don't tell me what it is, tell it what to do.
For example, consider "Store skipped intervals ...."
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2374,8 +2385,66 @@ def _mark_backfills_complete(self, *, session: Session =
NEW_SESSION) -> None:
for b in backfills:
b.completed_at = now
- def _create_dag_runs(self, dag_models: Collection[DagModel], session:
Session) -> None:
- """Create a DAG run and update the dag_model to control if/when the
next DAGRun should be created."""
+ def _collect_skipped_intervals(
+ self,
+ serdag: SerializedDAG,
+ new_data_interval: DataInterval,
+ session: Session,
+ ) -> list[tuple[DateTime, DateTime]]:
+ """
+ Return a list of (start, end) tuples for intervals skipped due to
catchup=False.
+
+ Computes the intervals that would have been scheduled between the
previous
+ automated DagRun's data_interval_end and the new run's
data_interval_start,
+ had catchup been True. Returns an empty list when there is no gap or
when
+ no previous run exists.
+ """
+ if serdag.catchup:
+ return []
+ listener_has_impls = bool(
+ get_listener_manager().hook.on_intervals_skipped.get_hookimpls()
# type: ignore[attr-defined]
+ )
+ if not serdag.has_on_skipped_intervals_callback and not
listener_has_impls:
+ return []
+
+ prev_run = session.scalar(
+ select(DagRun)
+ .where(
+ DagRun.dag_id == serdag.dag_id,
+ DagRun.run_type.in_([DagRunType.SCHEDULED]),
+ DagRun.data_interval_end.is_not(None),
+ DagRun.data_interval_end <= new_data_interval.start,
+ )
+ .order_by(DagRun.data_interval_end.desc())
+ .limit(1)
+ )
+ if prev_run is None or prev_run.data_interval_end is None:
+ return []
+
+ prev_end = prev_run.data_interval_end
+ new_start = new_data_interval.start
+ if prev_end >= new_start:
+ return []
+
+ skipped: list[tuple[DateTime, DateTime]] = []
+ for info in serdag.iter_dagrun_infos_between(prev_end, new_start):
+ if info.data_interval is None:
+ continue
+ if info.data_interval.start >= new_start:
+ continue
+ skipped.append((info.data_interval.start, info.data_interval.end))
+
+ return skipped
+
+ def _create_dag_runs(
+ self, dag_models: Collection[DagModel], session: Session
+ ) -> list[DagSkippedIntervalsCallbackRequest]:
+ """
+ Create a DAG run and update the dag_model to control if/when the next
DAGRun should be created.
Review Comment:
```suggestion
Create a Dag run and update the dag_model to control if/when the
next DagRun should be created.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]