Tegh25 commented on code in PR #68359:
URL: https://github.com/apache/airflow/pull/68359#discussion_r3440637261
##########
airflow-core/src/airflow/dag_processing/processor.py:
##########
@@ -389,6 +392,38 @@ def _execute_dag_callbacks(dagbag: DagBag, request:
DagCallbackRequest, log: Fil
stats.incr("dag.callback_exceptions", tags={"dag_id":
request.dag_id})
+def _execute_dag_skipped_intervals_callback(
+ dagbag: DagBag, request: DagSkippedIntervalsCallbackRequest, log:
FilteringBoundLogger
+) -> None:
+ from airflow._shared.timezones import timezone
+ from airflow.timetables.base import DataInterval
+
+ dag, _ = _get_dag_with_task(dagbag, request.dag_id)
+ callbacks = dag.on_skipped_intervals_callback
+ if not callbacks:
+ log.warning("Skipped intervals callback requested, but dag didn't have
any", dag_id=request.dag_id)
+ return
+
+ callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
+ skipped_intervals = [
+ DataInterval(start=timezone.coerce_datetime(start),
end=timezone.coerce_datetime(end))
+ for start, end in request.skipped_intervals
+ ]
+ context: Context = { # type: ignore[typeddict-unknown-key]
+ "dag": dag,
+ "reason": "skipped_intervals",
+ "skipped_intervals": skipped_intervals,
+ }
Review Comment:
Thanks, resolved with
[9f3d165](https://github.com/apache/airflow/pull/68359/commits/9f3d165023591c668b4ae172faaa61cba6206090)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]