This is an automated email from the ASF dual-hosted git repository.
phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 65d6676d887 Add dry run for backfill (#45062)
65d6676d887 is described below
commit 65d6676d8872915bb349845669e1bca40e25ba67
Author: Sneha Prabhu <[email protected]>
AuthorDate: Sun Jan 12 22:18:52 2025 +0530
Add dry run for backfill (#45062)
* Add dry run for backfill
---------
Co-authored-by: Sneha Prabhu <[email protected]>
Co-authored-by: Daniel Standish
<[email protected]>
---
.../api_fastapi/core_api/datamodels/backfills.py | 13 +++
.../api_fastapi/core_api/openapi/v1-generated.yaml | 76 +++++++++++++++++
.../core_api/routes/public/backfills.py | 32 +++++++
.../commands/remote_commands/backfill_command.py | 53 +++++-------
airflow/models/backfill.py | 99 ++++++++++++++++------
airflow/ui/openapi-gen/queries/common.ts | 3 +
airflow/ui/openapi-gen/queries/queries.ts | 36 ++++++++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 34 ++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 27 ++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 52 ++++++++++++
.../core_api/routes/public/test_backfills.py | 88 +++++++++++++++++++
11 files changed, 455 insertions(+), 58 deletions(-)
diff --git a/airflow/api_fastapi/core_api/datamodels/backfills.py
b/airflow/api_fastapi/core_api/datamodels/backfills.py
index be04063907a..e36e50ea3b8 100644
--- a/airflow/api_fastapi/core_api/datamodels/backfills.py
+++ b/airflow/api_fastapi/core_api/datamodels/backfills.py
@@ -56,3 +56,16 @@ class BackfillCollectionResponse(BaseModel):
backfills: list[BackfillResponse]
total_entries: int
+
+
+class DryRunBackfillResponse(BaseModel):
+ """Backfill serializer for responses in dry-run mode."""
+
+ logical_date: datetime
+
+
+class DryRunBackfillCollectionResponse(BaseModel):
+ """Backfill collection serializer for responses in dry-run mode."""
+
+ backfills: list[DryRunBackfillResponse]
+ total_entries: int
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 5d06e7ab28e..92cd7219a67 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -1481,6 +1481,55 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/backfills/dry_run:
+ post:
+ tags:
+ - Backfill
+ summary: Create Backfill Dry Run
+ operationId: create_backfill_dry_run
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/BackfillPostBody'
+ required: true
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DryRunBackfillCollectionResponse'
+ '401':
+ description: Unauthorized
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ '403':
+ description: Forbidden
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ '404':
+ description: Not Found
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ '409':
+ description: Conflict
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/connections/{connection_id}:
delete:
tags:
@@ -7964,6 +8013,33 @@ components:
This is the set of allowable values for the ``warning_type`` field
in the DagWarning model.'
+ DryRunBackfillCollectionResponse:
+ properties:
+ backfills:
+ items:
+ $ref: '#/components/schemas/DryRunBackfillResponse'
+ type: array
+ title: Backfills
+ total_entries:
+ type: integer
+ title: Total Entries
+ type: object
+ required:
+ - backfills
+ - total_entries
+ title: DryRunBackfillCollectionResponse
+ description: Backfill collection serializer for responses in dry-run
mode.
+ DryRunBackfillResponse:
+ properties:
+ logical_date:
+ type: string
+ format: date-time
+ title: Logical Date
+ type: object
+ required:
+ - logical_date
+ title: DryRunBackfillResponse
+ description: Backfill serializer for responses in dry-run mode.
EdgeResponse:
properties:
is_setup_teardown:
diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py
b/airflow/api_fastapi/core_api/routes/public/backfills.py
index 61d25597cdd..4dae9ce3b47 100644
--- a/airflow/api_fastapi/core_api/routes/public/backfills.py
+++ b/airflow/api_fastapi/core_api/routes/public/backfills.py
@@ -32,6 +32,8 @@ from airflow.api_fastapi.core_api.datamodels.backfills import
(
BackfillCollectionResponse,
BackfillPostBody,
BackfillResponse,
+ DryRunBackfillCollectionResponse,
+ DryRunBackfillResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
@@ -42,6 +44,7 @@ from airflow.models.backfill import (
Backfill,
BackfillDagRun,
_create_backfill,
+ _do_dry_run,
)
from airflow.utils import timezone
from airflow.utils.state import DagRunState
@@ -206,3 +209,32 @@ def create_backfill(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag
{backfill_request.dag_id}",
)
+
+
+@backfills_router.post(
+ path="/dry_run",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
+)
+def create_backfill_dry_run(
+ body: BackfillPostBody,
+ session: SessionDep,
+) -> DryRunBackfillCollectionResponse:
+ from_date = timezone.coerce_datetime(body.from_date)
+ to_date = timezone.coerce_datetime(body.to_date)
+
+ backfills_dry_run = _do_dry_run(
+ dag_id=body.dag_id,
+ from_date=from_date,
+ to_date=to_date,
+ reverse=body.run_backwards,
+ reprocess_behavior=body.reprocess_behavior,
+ session=session,
+ )
+ backfills = [DryRunBackfillResponse(logical_date=d) for d in
backfills_dry_run]
+
+ return DryRunBackfillCollectionResponse(backfills=backfills,
total_entries=len(backfills_dry_run))
diff --git a/airflow/cli/commands/remote_commands/backfill_command.py
b/airflow/cli/commands/remote_commands/backfill_command.py
index 63a8573ab73..4256e6a8b36 100644
--- a/airflow/cli/commands/remote_commands/backfill_command.py
+++ b/airflow/cli/commands/remote_commands/backfill_command.py
@@ -21,33 +21,13 @@ import logging
import signal
from airflow import settings
-from airflow.models.backfill import ReprocessBehavior, _create_backfill,
_get_info_list
-from airflow.models.serialized_dag import SerializedDagModel
+from airflow.models.backfill import ReprocessBehavior, _create_backfill,
_do_dry_run
from airflow.utils import cli as cli_utils
from airflow.utils.cli import sigint_handler
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
from airflow.utils.session import create_session
-def _do_dry_run(*, params, dag_id, from_date, to_date, reverse):
- print("Performing dry run of backfill.")
- print("Printing params:")
- for k, v in params.items():
- print(f" - {k} = {v}")
- with create_session() as session:
- serdag = session.get(SerializedDagModel, dag_id)
-
- info_list = _get_info_list(
- dag=serdag.dag,
- from_date=from_date,
- to_date=to_date,
- reverse=reverse,
- )
- print("Logical dates to be attempted:")
- for info in info_list:
- print(f" - {info.logical_date}")
-
-
@cli_utils.action_cli
@providers_configuration_loaded
def create_backfill(args) -> None:
@@ -61,22 +41,33 @@ def create_backfill(args) -> None:
reprocess_behavior = None
if args.dry_run:
- _do_dry_run(
- params=dict(
- dag_id=args.dag_id,
- from_date=args.from_date,
- to_date=args.to_date,
- max_active_runs=args.max_active_runs,
- reverse=args.run_backwards,
- dag_run_conf=args.dag_run_conf,
- reprocess_behavior=reprocess_behavior,
- ),
+ print("Performing dry run of backfill.")
+ print("Printing params:")
+ params = dict(
dag_id=args.dag_id,
from_date=args.from_date,
to_date=args.to_date,
+ max_active_runs=args.max_active_runs,
reverse=args.run_backwards,
+ dag_run_conf=args.dag_run_conf,
+ reprocess_behavior=reprocess_behavior,
)
+ for k, v in params.items():
+ print(f" - {k} = {v}")
+ with create_session() as session:
+ logical_dates = _do_dry_run(
+ dag_id=args.dag_id,
+ from_date=args.from_date,
+ to_date=args.to_date,
+ reverse=args.reverse,
+ reprocess_behavior=args.reprocess_behavior,
+ session=session,
+ )
+ print("Logical dates to be attempted:")
+ for d in logical_dates:
+ print(f" - {d}")
return
+
_create_backfill(
dag_id=args.dag_id,
from_date=args.from_date,
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index 0e88fa15bb0..39a28379a65 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -155,6 +155,72 @@ class BackfillDagRun(Base):
return val
+def _get_latest_dag_run_row_query(info, session):
+ from airflow.models import DagRun
+
+ return (
+ select(DagRun)
+ .where(DagRun.logical_date == info.logical_date)
+ .order_by(nulls_first(desc(DagRun.start_date), session=session))
+ .limit(1)
+ )
+
+
+def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior)
-> str | None:
+ non_create_reason = None
+ if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
+ non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
+ elif reprocess_behavior is ReprocessBehavior.NONE:
+ non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
+ elif reprocess_behavior is ReprocessBehavior.FAILED:
+ if dr.state != DagRunState.FAILED:
+ non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
+ return non_create_reason
+
+
+def _validate_backfill_params(dag, reverse, reprocess_behavior:
ReprocessBehavior | None):
+ depends_on_past = None
+ depends_on_past = any(x.depends_on_past for x in dag.tasks)
+ if depends_on_past:
+ if reverse is True:
+ raise ValueError(
+ "Backfill cannot be run in reverse when the dag has tasks
where depends_on_past=True"
+ )
+ if reprocess_behavior in (None, ReprocessBehavior.NONE):
+ raise ValueError(
+ "Dag has task for which depends_on_past is true. "
+ "You must set reprocess behavior to reprocess completed or "
+ "reprocess failed"
+ )
+
+
+def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior,
session) -> list[datetime]:
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ serdag =
session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
+ dag = serdag.dag
+ _validate_backfill_params(dag, reverse, reprocess_behavior)
+
+ dagrun_info_list = _get_info_list(
+ dag=dag,
+ from_date=from_date,
+ to_date=to_date,
+ reverse=reverse,
+ )
+ logical_dates = []
+ for info in dagrun_info_list:
+ dr = session.scalar(
+ statement=_get_latest_dag_run_row_query(info, session),
+ )
+ if dr:
+ non_create_reason = _get_dag_run_no_create_reason(dr,
reprocess_behavior)
+ if not non_create_reason:
+ logical_dates.append(info.logical_date)
+ else:
+ logical_dates.append(info.logical_date)
+ return logical_dates
+
+
def _create_backfill_dag_run(
*,
dag,
@@ -165,27 +231,15 @@ def _create_backfill_dag_run(
backfill_sort_ordinal,
session,
):
- from airflow.models import DagRun
-
with session.begin_nested() as nested:
dr = session.scalar(
with_row_locks(
- select(DagRun)
- .where(DagRun.logical_date == info.logical_date)
- .order_by(nulls_first(desc(DagRun.start_date),
session=session))
- .limit(1),
+ query=_get_latest_dag_run_row_query(info, session),
session=session,
- )
+ ),
)
if dr:
- non_create_reason = None
- if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
- non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
- elif reprocess_behavior is ReprocessBehavior.NONE:
- non_create_reason =
BackfillDagRunExceptionReason.ALREADY_EXISTS
- elif reprocess_behavior is ReprocessBehavior.FAILED:
- if dr.state != DagRunState.FAILED:
- non_create_reason =
BackfillDagRunExceptionReason.ALREADY_EXISTS
+ non_create_reason = _get_dag_run_no_create_reason(dr,
reprocess_behavior)
if non_create_reason:
# rolling back here restores to start of this nested tran
# which releases the lock on the latest dag run, since we
@@ -272,18 +326,8 @@ def _create_backfill(
)
dag = serdag.dag
- depends_on_past = any(x.depends_on_past for x in dag.tasks)
- if depends_on_past:
- if reverse is True:
- raise ValueError(
- "Backfill cannot be run in reverse when the dag has tasks
where depends_on_past=True"
- )
- if reprocess_behavior in (None, ReprocessBehavior.NONE):
- raise ValueError(
- "Dag has task for which depends_on_past is true. "
- "You must set reprocess behavior to reprocess completed or
"
- "reprocess failed"
- )
+ _validate_backfill_params(dag, reverse, reprocess_behavior)
+
br = Backfill(
dag_id=dag_id,
from_date=from_date,
@@ -316,6 +360,7 @@ def _create_backfill(
)
if not dag_model:
raise RuntimeError(f"Dag {dag_id} not found")
+
for info in dagrun_info_list:
backfill_sort_ordinal += 1
_create_backfill_dag_run(
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 8335d80e698..fea6805d40e 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1596,6 +1596,9 @@ export type AssetServiceCreateAssetEventMutationResult =
Awaited<
export type BackfillServiceCreateBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.createBackfill>
>;
+export type BackfillServiceCreateBackfillDryRunMutationResult = Awaited<
+ ReturnType<typeof BackfillService.createBackfillDryRun>
+>;
export type ConnectionServicePostConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.postConnection>
>;
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 9a816460704..801387eebb8 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -2710,6 +2710,42 @@ export const useBackfillServiceCreateBackfill = <
BackfillService.createBackfill({ requestBody }) as unknown as
Promise<TData>,
...options,
});
+/**
+ * Create Backfill Dry Run
+ * @param data The data for the request.
+ * @param data.requestBody
+ * @returns DryRunBackfillCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useBackfillServiceCreateBackfillDryRun = <
+ TData = Common.BackfillServiceCreateBackfillDryRunMutationResult,
+ TError = unknown,
+ TContext = unknown,
+>(
+ options?: Omit<
+ UseMutationOptions<
+ TData,
+ TError,
+ {
+ requestBody: BackfillPostBody;
+ },
+ TContext
+ >,
+ "mutationFn"
+ >,
+) =>
+ useMutation<
+ TData,
+ TError,
+ {
+ requestBody: BackfillPostBody;
+ },
+ TContext
+ >({
+ mutationFn: ({ requestBody }) =>
+ BackfillService.createBackfillDryRun({ requestBody }) as unknown as
Promise<TData>,
+ ...options,
+ });
/**
* Post Connection
* Create connection entry.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 64a8fcc9e81..443aef97a33 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2640,6 +2640,40 @@ This is the set of allowable values for the
\`\`warning_type\`\` field
in the DagWarning model.`,
} as const;
+export const $DryRunBackfillCollectionResponse = {
+ properties: {
+ backfills: {
+ items: {
+ $ref: "#/components/schemas/DryRunBackfillResponse",
+ },
+ type: "array",
+ title: "Backfills",
+ },
+ total_entries: {
+ type: "integer",
+ title: "Total Entries",
+ },
+ },
+ type: "object",
+ required: ["backfills", "total_entries"],
+ title: "DryRunBackfillCollectionResponse",
+ description: "Backfill collection serializer for responses in dry-run mode.",
+} as const;
+
+export const $DryRunBackfillResponse = {
+ properties: {
+ logical_date: {
+ type: "string",
+ format: "date-time",
+ title: "Logical Date",
+ },
+ },
+ type: "object",
+ required: ["logical_date"],
+ title: "DryRunBackfillResponse",
+ description: "Backfill serializer for responses in dry-run mode.",
+} as const;
+
export const $EdgeResponse = {
properties: {
is_setup_teardown: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 01666ae0904..c2a5a3ce6ec 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -54,6 +54,8 @@ import type {
UnpauseBackfillResponse,
CancelBackfillData,
CancelBackfillResponse,
+ CreateBackfillDryRunData,
+ CreateBackfillDryRunResponse,
GridDataData,
GridDataResponse,
DeleteConnectionData,
@@ -927,6 +929,31 @@ export class BackfillService {
},
});
}
+
+ /**
+ * Create Backfill Dry Run
+ * @param data The data for the request.
+ * @param data.requestBody
+ * @returns DryRunBackfillCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static createBackfillDryRun(
+ data: CreateBackfillDryRunData,
+ ): CancelablePromise<CreateBackfillDryRunResponse> {
+ return __request(OpenAPI, {
+ method: "POST",
+ url: "/public/backfills/dry_run",
+ body: data.requestBody,
+ mediaType: "application/json",
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 409: "Conflict",
+ 422: "Validation Error",
+ },
+ });
+ }
}
export class GridService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 232e66d2464..f2f709b02db 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -637,6 +637,21 @@ export type DagTagResponse = {
*/
export type DagWarningType = "asset conflict" | "non-existent pool";
+/**
+ * Backfill collection serializer for responses in dry-run mode.
+ */
+export type DryRunBackfillCollectionResponse = {
+ backfills: Array<DryRunBackfillResponse>;
+ total_entries: number;
+};
+
+/**
+ * Backfill serializer for responses in dry-run mode.
+ */
+export type DryRunBackfillResponse = {
+ logical_date: string;
+};
+
/**
* Edge serializer for responses.
*/
@@ -1552,6 +1567,12 @@ export type CancelBackfillData = {
export type CancelBackfillResponse = BackfillResponse;
+export type CreateBackfillDryRunData = {
+ requestBody: BackfillPostBody;
+};
+
+export type CreateBackfillDryRunResponse = DryRunBackfillCollectionResponse;
+
export type GridDataData = {
dagId: string;
includeDownstream?: boolean;
@@ -2819,6 +2840,37 @@ export type $OpenApiTs = {
};
};
};
+ "/public/backfills/dry_run": {
+ post: {
+ req: CreateBackfillDryRunData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: DryRunBackfillCollectionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Conflict
+ */
+ 409: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/ui/grid/{dag_id}": {
get: {
req: GridDataData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py
b/tests/api_fastapi/core_api/routes/public/test_backfills.py
index 1c64b10848f..4f0ae7918e4 100644
--- a/tests/api_fastapi/core_api/routes/public/test_backfills.py
+++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py
@@ -192,6 +192,7 @@ class TestCreateBackfill(TestBackfillEndpoint):
"max_active_runs": max_active_runs,
"run_backwards": False,
"dag_run_conf": {"param1": "val1", "param2": True},
+ "dry_run": False,
}
if repro_act is not None:
data["reprocess_behavior"] = repro_act
@@ -215,6 +216,93 @@ class TestCreateBackfill(TestBackfillEndpoint):
}
+class TestCreateBackfillDryRun(TestBackfillEndpoint):
+ @pytest.mark.parametrize(
+ "reprocess_behavior, expected_dates",
+ [
+ (
+ "none",
+ [
+ {"logical_date": "2024-01-01T00:00:00Z"},
+ {"logical_date": "2024-01-04T00:00:00Z"},
+ {"logical_date": "2024-01-05T00:00:00Z"},
+ ],
+ ),
+ (
+ "failed",
+ [
+ {"logical_date": "2024-01-01T00:00:00Z"},
+ {"logical_date": "2024-01-03T00:00:00Z"}, # Reprocess
failed
+ {"logical_date": "2024-01-04T00:00:00Z"},
+ {"logical_date": "2024-01-05T00:00:00Z"},
+ ],
+ ),
+ (
+ "completed",
+ [
+ {"logical_date": "2024-01-01T00:00:00Z"},
+ {"logical_date": "2024-01-02T00:00:00Z"}, # Reprocess all
+ {"logical_date": "2024-01-03T00:00:00Z"},
+ {"logical_date": "2024-01-04T00:00:00Z"},
+ {"logical_date": "2024-01-05T00:00:00Z"},
+ ],
+ ),
+ ],
+ )
+ def test_create_backfill_dry_run(
+ self, session, dag_maker, test_client, reprocess_behavior,
expected_dates
+ ):
+ with dag_maker(
+ session=session,
+ dag_id="TEST_DAG_2",
+ schedule="0 0 * * *",
+ start_date=pendulum.parse("2024-01-01"),
+ ) as dag:
+ EmptyOperator(task_id="mytask")
+
+ session.commit()
+
+ existing_dagruns = [
+ {"logical_date": pendulum.parse("2024-01-02"), "state":
DagRunState.SUCCESS}, # Completed dag run
+ {"logical_date": pendulum.parse("2024-01-03"), "state":
DagRunState.FAILED}, # Failed dag run
+ ]
+ for dagrun in existing_dagruns:
+ session.add(
+ DagRun(
+ dag_id=dag.dag_id,
+ run_id=f"manual__{dagrun['logical_date'].isoformat()}",
+ logical_date=dagrun["logical_date"],
+ state=dagrun["state"],
+ run_type="scheduled",
+ )
+ )
+ session.commit()
+
+ from_date = pendulum.parse("2024-01-01")
+ from_date_iso = to_iso(from_date)
+ to_date = pendulum.parse("2024-01-05")
+ to_date_iso = to_iso(to_date)
+
+ data = {
+ "dag_id": dag.dag_id,
+ "from_date": from_date_iso,
+ "to_date": to_date_iso,
+ "max_active_runs": 5,
+ "run_backwards": False,
+ "dag_run_conf": {"param1": "val1", "param2": True},
+ "reprocess_behavior": reprocess_behavior,
+ }
+
+ response = test_client.post(
+ url="/public/backfills/dry_run",
+ json=data,
+ )
+
+ assert response.status_code == 200
+ response_json = response.json()
+ assert response_json["backfills"] == expected_dates
+
+
class TestCancelBackfill(TestBackfillEndpoint):
def test_cancel_backfill(self, session, test_client):
(dag,) = self._create_dag_models()