pierrejeambrun commented on code in PR #45062:
URL: https://github.com/apache/airflow/pull/45062#discussion_r1911080457
##########
airflow/models/backfill.py:
##########
@@ -155,6 +155,73 @@ def validate_sort_ordinal(self, key, val):
return val
+def _get_latest_dag_run_row_query(info, session):
+ from airflow.models import DagRun
+
+ return (
+ select(DagRun)
+ .where(DagRun.logical_date == info.logical_date)
+ .order_by(nulls_first(desc(DagRun.start_date), session=session))
+ .limit(1)
+ )
+
+
+def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior)
-> str | None:
+ non_create_reason = None
+ if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
+ non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
+ elif reprocess_behavior is ReprocessBehavior.NONE:
+ non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
+ elif reprocess_behavior is ReprocessBehavior.FAILED:
+ if dr.state != DagRunState.FAILED:
+ non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
+ return non_create_reason
+
+
+def _validate_backfill_params(dag, reverse, reprocess_behavior:
ReprocessBehavior | None):
+ depends_on_past = None
+ depends_on_past = any(x.depends_on_past for x in dag.tasks)
+ if depends_on_past:
+ if reverse is True:
+ raise ValueError(
+ "Backfill cannot be run in reverse when the dag has tasks
where depends_on_past=True"
+ )
+ if reprocess_behavior in (None, ReprocessBehavior.NONE):
+ raise ValueError(
+ "Dag has task for which depends_on_past is true. "
+ "You must set reprocess behavior to reprocess completed or "
+ "reprocess failed"
+ )
+
+
+def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior) ->
list[datetime]:
Review Comment:
Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]