ferruzzi commented on code in PR #68359:
URL: https://github.com/apache/airflow/pull/68359#discussion_r3431193632


##########
airflow-core/docs/administration-and-deployment/logging-monitoring/callbacks.rst:
##########
@@ -95,6 +98,46 @@ For example, a timeout may be caused by a number of stalling 
tasks, but only one
     Before Airflow 3.2.0, the rules above did not apply and the task instance 
passed to Dag callback was not related to Dag state, rather being selected as 
the latest task in the Dag
     lexicographically.
 
+Skipped Intervals Callback
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When a Dag is defined with ``catchup=False``, the scheduler may advance past 
missed scheduled
+intervals without creating Dag runs for them. Set 
``on_skipped_intervals_callback`` on the Dag
+to be notified when that happens.
+

Review Comment:
   If the Dag is paused, would this trigger the callback to fire on every 
scheduler pass (ever 5 seconds)?  Maybe it needs some kind of throttling so it 
doesn't hammer the callback.



##########
airflow-core/src/airflow/dag_processing/processor.py:
##########
@@ -389,6 +392,38 @@ def _execute_dag_callbacks(dagbag: DagBag, request: 
DagCallbackRequest, log: Fil
             stats.incr("dag.callback_exceptions", tags={"dag_id": 
request.dag_id})
 
 
+def _execute_dag_skipped_intervals_callback(
+    dagbag: DagBag, request: DagSkippedIntervalsCallbackRequest, log: 
FilteringBoundLogger
+) -> None:
+    from airflow._shared.timezones import timezone
+    from airflow.timetables.base import DataInterval
+
+    dag, _ = _get_dag_with_task(dagbag, request.dag_id)
+    callbacks = dag.on_skipped_intervals_callback
+    if not callbacks:
+        log.warning("Skipped intervals callback requested, but dag didn't have 
any", dag_id=request.dag_id)
+        return
+
+    callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
+    skipped_intervals = [
+        DataInterval(start=timezone.coerce_datetime(start), 
end=timezone.coerce_datetime(end))
+        for start, end in request.skipped_intervals
+    ]
+    context: Context = {  # type: ignore[typeddict-unknown-key]
+        "dag": dag,
+        "reason": "skipped_intervals",
+        "skipped_intervals": skipped_intervals,
+    }

Review Comment:
   Any time you need to add an `ignore` should be a red flag.  I'm not saying 
it's never the right answer, but it very rarely is, and it's usually masking a 
bigger issue.  In this case, it looks like you are `type`ing it as a `Context` 
just to make MyPy happy, but it's clearly not an actual `Context`.  
   
   The most obvious answer would be to make a new type (`SkippedContext`?) and 
expand the accepted types where this gets sent.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2374,8 +2385,66 @@ def _mark_backfills_complete(self, *, session: Session = 
NEW_SESSION) -> None:
         for b in backfills:
             b.completed_at = now
 
-    def _create_dag_runs(self, dag_models: Collection[DagModel], session: 
Session) -> None:
-        """Create a DAG run and update the dag_model to control if/when the 
next DAGRun should be created."""
+    def _collect_skipped_intervals(
+        self,
+        serdag: SerializedDAG,
+        new_data_interval: DataInterval,
+        session: Session,
+    ) -> list[tuple[DateTime, DateTime]]:
+        """
+        Return a list of (start, end) tuples for intervals skipped due to 
catchup=False.
+
+        Computes the intervals that would have been scheduled between the 
previous
+        automated DagRun's data_interval_end and the new run's 
data_interval_start,
+        had catchup been True.  Returns an empty list when there is no gap or 
when
+        no previous run exists.
+        """
+        if serdag.catchup:
+            return []
+        listener_has_impls = bool(
+            get_listener_manager().hook.on_intervals_skipped.get_hookimpls()  
# type: ignore[attr-defined]
+        )
+        if not serdag.has_on_skipped_intervals_callback and not 
listener_has_impls:
+            return []
+
+        prev_run = session.scalar(
+            select(DagRun)
+            .where(
+                DagRun.dag_id == serdag.dag_id,
+                DagRun.run_type.in_([DagRunType.SCHEDULED]),
+                DagRun.data_interval_end.is_not(None),
+                DagRun.data_interval_end <= new_data_interval.start,
+            )
+            .order_by(DagRun.data_interval_end.desc())
+            .limit(1)
+        )
+        if prev_run is None or prev_run.data_interval_end is None:
+            return []
+
+        prev_end = prev_run.data_interval_end
+        new_start = new_data_interval.start
+        if prev_end >= new_start:
+            return []
+
+        skipped: list[tuple[DateTime, DateTime]] = []
+        for info in serdag.iter_dagrun_infos_between(prev_end, new_start):

Review Comment:
   If a Dag was scheduled to run every day and was paused for a week, 
`iter_dagrun_infos_between` is going to return seven skips. If it's scheduled 
more often and/or paused longer, this is going to blow up real fast.  Do we 
actually want to send a list with every miss to the callback or can we make 
this more efficient?



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2505,6 +2574,34 @@ def _create_dag_runs(self, dag_models: 
Collection[DagModel], session: Session) -
                     
active_non_backfill_runs=active_runs_of_dags[dag_model.dag_id],
                 )
 
+                if data_interval is not None:
+                    skipped = self._collect_skipped_intervals(
+                        serdag=serdag,
+                        new_data_interval=data_interval,
+                        session=session,
+                    )
+                    if skipped:
+                        try:
+                            get_listener_manager().hook.on_intervals_skipped(  
# type: ignore[attr-defined]
+                                dag_id=serdag.dag_id,
+                                skipped_intervals=[DataInterval(start=s, 
end=e) for s, e in skipped],
+                            )
+                        except Exception:
+                            self.log.exception(
+                                "Error notifying listener of skipped 
intervals",
+                                dag_id=serdag.dag_id,
+                            )

Review Comment:
   This actually may eventually be an issue.  It's following the existing 
patterns so not really your problem, but this is adding more blocking work on 
the scheduler.   We try very hard to limit any blocking processes on the 
scheduler since that creates a bottleneck in the system.  We may eventually 
want to figure out how to move the "should this dag callback fire?" logic off 
somewhere else.
   
   
   No action required, feel free to resolve this once you've read it and 
understand the concern.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2374,8 +2385,66 @@ def _mark_backfills_complete(self, *, session: Session = 
NEW_SESSION) -> None:
         for b in backfills:
             b.completed_at = now
 
-    def _create_dag_runs(self, dag_models: Collection[DagModel], session: 
Session) -> None:
-        """Create a DAG run and update the dag_model to control if/when the 
next DAGRun should be created."""
+    def _collect_skipped_intervals(
+        self,
+        serdag: SerializedDAG,
+        new_data_interval: DataInterval,
+        session: Session,
+    ) -> list[tuple[DateTime, DateTime]]:
+        """
+        Return a list of (start, end) tuples for intervals skipped due to 
catchup=False.
+
+        Computes the intervals that would have been scheduled between the 
previous
+        automated DagRun's data_interval_end and the new run's 
data_interval_start,
+        had catchup been True.  Returns an empty list when there is no gap or 
when
+        no previous run exists.
+        """
+        if serdag.catchup:
+            return []
+        listener_has_impls = bool(
+            get_listener_manager().hook.on_intervals_skipped.get_hookimpls()  
# type: ignore[attr-defined]
+        )
+        if not serdag.has_on_skipped_intervals_callback and not 
listener_has_impls:
+            return []
+
+        prev_run = session.scalar(
+            select(DagRun)
+            .where(
+                DagRun.dag_id == serdag.dag_id,
+                DagRun.run_type.in_([DagRunType.SCHEDULED]),

Review Comment:
   Style nitpick: 
   
   ```suggestion
                   DagRun.run_type == DagRunType.SCHEDULED,,
   ```



##########
task-sdk/src/airflow/sdk/execution_time/schema/versions/__init__.py:
##########
@@ -19,7 +19,10 @@
 
 from cadwyn import HeadVersion, Version, VersionBundle
 
+from airflow.sdk.execution_time.schema.versions.v2026_06_16 import 
AddDagSkippedIntervalsCallbackRequest
+
 bundle = VersionBundle(

Review Comment:
   Just a heads up that this is going to be a friction point when you rebase.  
Nothing you can do about it, just a warning.



##########
airflow-core/src/airflow/callbacks/callback_requests.py:
##########
@@ -172,8 +173,17 @@ class DagCallbackRequest(BaseCallbackRequest):
     type: Literal["DagCallbackRequest"] = "DagCallbackRequest"
 
 
+class DagSkippedIntervalsCallbackRequest(BaseCallbackRequest):
+    """A Class with information about the skipped intervals DAG callback to be 
executed."""

Review Comment:
   Convention is to use imperative tone for class docstrings.  In other words, 
don't tell me what it is, tell it what to do.
   
   For example, consider "Store skipped intervals ...." 



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2374,8 +2385,66 @@ def _mark_backfills_complete(self, *, session: Session = 
NEW_SESSION) -> None:
         for b in backfills:
             b.completed_at = now
 
-    def _create_dag_runs(self, dag_models: Collection[DagModel], session: 
Session) -> None:
-        """Create a DAG run and update the dag_model to control if/when the 
next DAGRun should be created."""
+    def _collect_skipped_intervals(
+        self,
+        serdag: SerializedDAG,
+        new_data_interval: DataInterval,
+        session: Session,
+    ) -> list[tuple[DateTime, DateTime]]:
+        """
+        Return a list of (start, end) tuples for intervals skipped due to 
catchup=False.
+
+        Computes the intervals that would have been scheduled between the 
previous
+        automated DagRun's data_interval_end and the new run's 
data_interval_start,
+        had catchup been True.  Returns an empty list when there is no gap or 
when
+        no previous run exists.
+        """
+        if serdag.catchup:
+            return []
+        listener_has_impls = bool(
+            get_listener_manager().hook.on_intervals_skipped.get_hookimpls()  
# type: ignore[attr-defined]
+        )
+        if not serdag.has_on_skipped_intervals_callback and not 
listener_has_impls:
+            return []
+
+        prev_run = session.scalar(
+            select(DagRun)
+            .where(
+                DagRun.dag_id == serdag.dag_id,
+                DagRun.run_type.in_([DagRunType.SCHEDULED]),
+                DagRun.data_interval_end.is_not(None),
+                DagRun.data_interval_end <= new_data_interval.start,
+            )
+            .order_by(DagRun.data_interval_end.desc())
+            .limit(1)
+        )
+        if prev_run is None or prev_run.data_interval_end is None:
+            return []
+
+        prev_end = prev_run.data_interval_end
+        new_start = new_data_interval.start
+        if prev_end >= new_start:
+            return []
+
+        skipped: list[tuple[DateTime, DateTime]] = []
+        for info in serdag.iter_dagrun_infos_between(prev_end, new_start):
+            if info.data_interval is None:
+                continue
+            if info.data_interval.start >= new_start:
+                continue
+            skipped.append((info.data_interval.start, info.data_interval.end))
+
+        return skipped
+
+    def _create_dag_runs(
+        self, dag_models: Collection[DagModel], session: Session
+    ) -> list[DagSkippedIntervalsCallbackRequest]:
+        """
+        Create a DAG run and update the dag_model to control if/when the next 
DAGRun should be created.

Review Comment:
   ```suggestion
           Create a Dag run and update the dag_model to control if/when the 
next DagRun should be created.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to