pierrejeambrun commented on code in PR #45062:
URL: https://github.com/apache/airflow/pull/45062#discussion_r1910590969
##########
airflow/models/backfill.py:
##########
@@ -155,6 +155,73 @@ def validate_sort_ordinal(self, key, val):
return val
+def _get_latest_dag_run_row_query(info, session):
+ from airflow.models import DagRun
+
+ return (
+ select(DagRun)
+ .where(DagRun.logical_date == info.logical_date)
+ .order_by(nulls_first(desc(DagRun.start_date), session=session))
+ .limit(1)
+ )
+
+
+def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior)
-> str | None:
+ non_create_reason = None
+ if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
+ non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
+ elif reprocess_behavior is ReprocessBehavior.NONE:
+ non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
+ elif reprocess_behavior is ReprocessBehavior.FAILED:
+ if dr.state != DagRunState.FAILED:
+ non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
+ return non_create_reason
+
+
+def _validate_backfill_params(dag, reverse, reprocess_behavior:
ReprocessBehavior | None):
+ depends_on_past = None
+ depends_on_past = any(x.depends_on_past for x in dag.tasks)
+ if depends_on_past:
+ if reverse is True:
+ raise ValueError(
+ "Backfill cannot be run in reverse when the dag has tasks
where depends_on_past=True"
+ )
+ if reprocess_behavior in (None, ReprocessBehavior.NONE):
+ raise ValueError(
+ "Dag has task for which depends_on_past is true. "
+ "You must set reprocess behavior to reprocess completed or "
+ "reprocess failed"
+ )
+
+
+def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior) ->
list[datetime]:
Review Comment:
We most probably want to be able to pass a session here, just in case the
caller is managing that. (it's the case for the API).
##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -206,3 +209,30 @@ def create_backfill(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag
{backfill_request.dag_id}",
)
+
+
+@backfills_router.post(
+ path="/dry_run",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
+)
+def create_backfill_dry_run(
+ body: BackfillPostBody,
+) -> DryRunBackfillCollectionResponse:
+ from_date = timezone.coerce_datetime(body.from_date)
+ to_date = timezone.coerce_datetime(body.to_date)
+
+ backfills_dry_run = _do_dry_run(
+ dag_id=body.dag_id,
+ from_date=from_date,
+ to_date=to_date,
+ reverse=body.run_backwards,
+ reprocess_behavior=body.reprocess_behavior,
Review Comment:
and pass it there.
Usually 1 endpoint = 1 session. Opened at the beginning and committed on
exit. But that's not blocking, just a suggestion to be consistent with other
endpoints.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]