This is an automated email from the ASF dual-hosted git repository.

phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 65d6676d887 Add dry run for backfill (#45062)
65d6676d887 is described below

commit 65d6676d8872915bb349845669e1bca40e25ba67
Author: Sneha Prabhu <[email protected]>
AuthorDate: Sun Jan 12 22:18:52 2025 +0530

    Add dry run for backfill (#45062)
    
    * Add dry run for backfill
    
    ---------
    
    Co-authored-by: Sneha Prabhu <[email protected]>
    Co-authored-by: Daniel Standish 
<[email protected]>
---
 .../api_fastapi/core_api/datamodels/backfills.py   | 13 +++
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 76 +++++++++++++++++
 .../core_api/routes/public/backfills.py            | 32 +++++++
 .../commands/remote_commands/backfill_command.py   | 53 +++++-------
 airflow/models/backfill.py                         | 99 ++++++++++++++++------
 airflow/ui/openapi-gen/queries/common.ts           |  3 +
 airflow/ui/openapi-gen/queries/queries.ts          | 36 ++++++++
 airflow/ui/openapi-gen/requests/schemas.gen.ts     | 34 ++++++++
 airflow/ui/openapi-gen/requests/services.gen.ts    | 27 ++++++
 airflow/ui/openapi-gen/requests/types.gen.ts       | 52 ++++++++++++
 .../core_api/routes/public/test_backfills.py       | 88 +++++++++++++++++++
 11 files changed, 455 insertions(+), 58 deletions(-)

diff --git a/airflow/api_fastapi/core_api/datamodels/backfills.py 
b/airflow/api_fastapi/core_api/datamodels/backfills.py
index be04063907a..e36e50ea3b8 100644
--- a/airflow/api_fastapi/core_api/datamodels/backfills.py
+++ b/airflow/api_fastapi/core_api/datamodels/backfills.py
@@ -56,3 +56,16 @@ class BackfillCollectionResponse(BaseModel):
 
     backfills: list[BackfillResponse]
     total_entries: int
+
+
+class DryRunBackfillResponse(BaseModel):
+    """Backfill serializer for responses in dry-run mode."""
+
+    logical_date: datetime
+
+
+class DryRunBackfillCollectionResponse(BaseModel):
+    """Backfill collection serializer for responses in dry-run mode."""
+
+    backfills: list[DryRunBackfillResponse]
+    total_entries: int
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 5d06e7ab28e..92cd7219a67 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -1481,6 +1481,55 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /public/backfills/dry_run:
+    post:
+      tags:
+      - Backfill
+      summary: Create Backfill Dry Run
+      operationId: create_backfill_dry_run
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/BackfillPostBody'
+        required: true
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DryRunBackfillCollectionResponse'
+        '401':
+          description: Unauthorized
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+        '403':
+          description: Forbidden
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+        '404':
+          description: Not Found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+        '409':
+          description: Conflict
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/connections/{connection_id}:
     delete:
       tags:
@@ -7964,6 +8013,33 @@ components:
         This is the set of allowable values for the ``warning_type`` field
 
         in the DagWarning model.'
+    DryRunBackfillCollectionResponse:
+      properties:
+        backfills:
+          items:
+            $ref: '#/components/schemas/DryRunBackfillResponse'
+          type: array
+          title: Backfills
+        total_entries:
+          type: integer
+          title: Total Entries
+      type: object
+      required:
+      - backfills
+      - total_entries
+      title: DryRunBackfillCollectionResponse
+      description: Backfill collection serializer for responses in dry-run 
mode.
+    DryRunBackfillResponse:
+      properties:
+        logical_date:
+          type: string
+          format: date-time
+          title: Logical Date
+      type: object
+      required:
+      - logical_date
+      title: DryRunBackfillResponse
+      description: Backfill serializer for responses in dry-run mode.
     EdgeResponse:
       properties:
         is_setup_teardown:
diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py 
b/airflow/api_fastapi/core_api/routes/public/backfills.py
index 61d25597cdd..4dae9ce3b47 100644
--- a/airflow/api_fastapi/core_api/routes/public/backfills.py
+++ b/airflow/api_fastapi/core_api/routes/public/backfills.py
@@ -32,6 +32,8 @@ from airflow.api_fastapi.core_api.datamodels.backfills import 
(
     BackfillCollectionResponse,
     BackfillPostBody,
     BackfillResponse,
+    DryRunBackfillCollectionResponse,
+    DryRunBackfillResponse,
 )
 from airflow.api_fastapi.core_api.openapi.exceptions import (
     create_openapi_http_exception_doc,
@@ -42,6 +44,7 @@ from airflow.models.backfill import (
     Backfill,
     BackfillDagRun,
     _create_backfill,
+    _do_dry_run,
 )
 from airflow.utils import timezone
 from airflow.utils.state import DagRunState
@@ -206,3 +209,32 @@ def create_backfill(
             status_code=status.HTTP_409_CONFLICT,
             detail=f"There is already a running backfill for dag 
{backfill_request.dag_id}",
         )
+
+
+@backfills_router.post(
+    path="/dry_run",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_409_CONFLICT,
+        ]
+    ),
+)
+def create_backfill_dry_run(
+    body: BackfillPostBody,
+    session: SessionDep,
+) -> DryRunBackfillCollectionResponse:
+    from_date = timezone.coerce_datetime(body.from_date)
+    to_date = timezone.coerce_datetime(body.to_date)
+
+    backfills_dry_run = _do_dry_run(
+        dag_id=body.dag_id,
+        from_date=from_date,
+        to_date=to_date,
+        reverse=body.run_backwards,
+        reprocess_behavior=body.reprocess_behavior,
+        session=session,
+    )
+    backfills = [DryRunBackfillResponse(logical_date=d) for d in 
backfills_dry_run]
+
+    return DryRunBackfillCollectionResponse(backfills=backfills, 
total_entries=len(backfills_dry_run))
diff --git a/airflow/cli/commands/remote_commands/backfill_command.py 
b/airflow/cli/commands/remote_commands/backfill_command.py
index 63a8573ab73..4256e6a8b36 100644
--- a/airflow/cli/commands/remote_commands/backfill_command.py
+++ b/airflow/cli/commands/remote_commands/backfill_command.py
@@ -21,33 +21,13 @@ import logging
 import signal
 
 from airflow import settings
-from airflow.models.backfill import ReprocessBehavior, _create_backfill, 
_get_info_list
-from airflow.models.serialized_dag import SerializedDagModel
+from airflow.models.backfill import ReprocessBehavior, _create_backfill, 
_do_dry_run
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import sigint_handler
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.session import create_session
 
 
-def _do_dry_run(*, params, dag_id, from_date, to_date, reverse):
-    print("Performing dry run of backfill.")
-    print("Printing params:")
-    for k, v in params.items():
-        print(f"    - {k} = {v}")
-    with create_session() as session:
-        serdag = session.get(SerializedDagModel, dag_id)
-
-    info_list = _get_info_list(
-        dag=serdag.dag,
-        from_date=from_date,
-        to_date=to_date,
-        reverse=reverse,
-    )
-    print("Logical dates to be attempted:")
-    for info in info_list:
-        print(f"    - {info.logical_date}")
-
-
 @cli_utils.action_cli
 @providers_configuration_loaded
 def create_backfill(args) -> None:
@@ -61,22 +41,33 @@ def create_backfill(args) -> None:
         reprocess_behavior = None
 
     if args.dry_run:
-        _do_dry_run(
-            params=dict(
-                dag_id=args.dag_id,
-                from_date=args.from_date,
-                to_date=args.to_date,
-                max_active_runs=args.max_active_runs,
-                reverse=args.run_backwards,
-                dag_run_conf=args.dag_run_conf,
-                reprocess_behavior=reprocess_behavior,
-            ),
+        print("Performing dry run of backfill.")
+        print("Printing params:")
+        params = dict(
             dag_id=args.dag_id,
             from_date=args.from_date,
             to_date=args.to_date,
+            max_active_runs=args.max_active_runs,
             reverse=args.run_backwards,
+            dag_run_conf=args.dag_run_conf,
+            reprocess_behavior=reprocess_behavior,
         )
+        for k, v in params.items():
+            print(f"    - {k} = {v}")
+        with create_session() as session:
+            logical_dates = _do_dry_run(
+                dag_id=args.dag_id,
+                from_date=args.from_date,
+                to_date=args.to_date,
+                reverse=args.reverse,
+                reprocess_behavior=args.reprocess_behavior,
+                session=session,
+            )
+        print("Logical dates to be attempted:")
+        for d in logical_dates:
+            print(f"    - {d}")
         return
+
     _create_backfill(
         dag_id=args.dag_id,
         from_date=args.from_date,
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index 0e88fa15bb0..39a28379a65 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -155,6 +155,72 @@ class BackfillDagRun(Base):
         return val
 
 
+def _get_latest_dag_run_row_query(info, session):
+    from airflow.models import DagRun
+
+    return (
+        select(DagRun)
+        .where(DagRun.logical_date == info.logical_date)
+        .order_by(nulls_first(desc(DagRun.start_date), session=session))
+        .limit(1)
+    )
+
+
+def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) 
-> str | None:
+    non_create_reason = None
+    if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
+        non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
+    elif reprocess_behavior is ReprocessBehavior.NONE:
+        non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
+    elif reprocess_behavior is ReprocessBehavior.FAILED:
+        if dr.state != DagRunState.FAILED:
+            non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
+    return non_create_reason
+
+
+def _validate_backfill_params(dag, reverse, reprocess_behavior: 
ReprocessBehavior | None):
+    depends_on_past = None
+    depends_on_past = any(x.depends_on_past for x in dag.tasks)
+    if depends_on_past:
+        if reverse is True:
+            raise ValueError(
+                "Backfill cannot be run in reverse when the dag has tasks 
where depends_on_past=True"
+            )
+        if reprocess_behavior in (None, ReprocessBehavior.NONE):
+            raise ValueError(
+                "Dag has task for which depends_on_past is true. "
+                "You must set reprocess behavior to reprocess completed or "
+                "reprocess failed"
+            )
+
+
+def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, 
session) -> list[datetime]:
+    from airflow.models.serialized_dag import SerializedDagModel
+
+    serdag = 
session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
+    dag = serdag.dag
+    _validate_backfill_params(dag, reverse, reprocess_behavior)
+
+    dagrun_info_list = _get_info_list(
+        dag=dag,
+        from_date=from_date,
+        to_date=to_date,
+        reverse=reverse,
+    )
+    logical_dates = []
+    for info in dagrun_info_list:
+        dr = session.scalar(
+            statement=_get_latest_dag_run_row_query(info, session),
+        )
+        if dr:
+            non_create_reason = _get_dag_run_no_create_reason(dr, 
reprocess_behavior)
+            if not non_create_reason:
+                logical_dates.append(info.logical_date)
+        else:
+            logical_dates.append(info.logical_date)
+    return logical_dates
+
+
 def _create_backfill_dag_run(
     *,
     dag,
@@ -165,27 +231,15 @@ def _create_backfill_dag_run(
     backfill_sort_ordinal,
     session,
 ):
-    from airflow.models import DagRun
-
     with session.begin_nested() as nested:
         dr = session.scalar(
             with_row_locks(
-                select(DagRun)
-                .where(DagRun.logical_date == info.logical_date)
-                .order_by(nulls_first(desc(DagRun.start_date), 
session=session))
-                .limit(1),
+                query=_get_latest_dag_run_row_query(info, session),
                 session=session,
-            )
+            ),
         )
         if dr:
-            non_create_reason = None
-            if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
-                non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
-            elif reprocess_behavior is ReprocessBehavior.NONE:
-                non_create_reason = 
BackfillDagRunExceptionReason.ALREADY_EXISTS
-            elif reprocess_behavior is ReprocessBehavior.FAILED:
-                if dr.state != DagRunState.FAILED:
-                    non_create_reason = 
BackfillDagRunExceptionReason.ALREADY_EXISTS
+            non_create_reason = _get_dag_run_no_create_reason(dr, 
reprocess_behavior)
             if non_create_reason:
                 # rolling back here restores to start of this nested tran
                 # which releases the lock on the latest dag run, since we
@@ -272,18 +326,8 @@ def _create_backfill(
             )
 
         dag = serdag.dag
-        depends_on_past = any(x.depends_on_past for x in dag.tasks)
-        if depends_on_past:
-            if reverse is True:
-                raise ValueError(
-                    "Backfill cannot be run in reverse when the dag has tasks 
where depends_on_past=True"
-                )
-            if reprocess_behavior in (None, ReprocessBehavior.NONE):
-                raise ValueError(
-                    "Dag has task for which depends_on_past is true. "
-                    "You must set reprocess behavior to reprocess completed or 
"
-                    "reprocess failed"
-                )
+        _validate_backfill_params(dag, reverse, reprocess_behavior)
+
         br = Backfill(
             dag_id=dag_id,
             from_date=from_date,
@@ -316,6 +360,7 @@ def _create_backfill(
         )
         if not dag_model:
             raise RuntimeError(f"Dag {dag_id} not found")
+
         for info in dagrun_info_list:
             backfill_sort_ordinal += 1
             _create_backfill_dag_run(
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 8335d80e698..fea6805d40e 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1596,6 +1596,9 @@ export type AssetServiceCreateAssetEventMutationResult = 
Awaited<
 export type BackfillServiceCreateBackfillMutationResult = Awaited<
   ReturnType<typeof BackfillService.createBackfill>
 >;
+export type BackfillServiceCreateBackfillDryRunMutationResult = Awaited<
+  ReturnType<typeof BackfillService.createBackfillDryRun>
+>;
 export type ConnectionServicePostConnectionMutationResult = Awaited<
   ReturnType<typeof ConnectionService.postConnection>
 >;
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 9a816460704..801387eebb8 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -2710,6 +2710,42 @@ export const useBackfillServiceCreateBackfill = <
       BackfillService.createBackfill({ requestBody }) as unknown as 
Promise<TData>,
     ...options,
   });
+/**
+ * Create Backfill Dry Run
+ * @param data The data for the request.
+ * @param data.requestBody
+ * @returns DryRunBackfillCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useBackfillServiceCreateBackfillDryRun = <
+  TData = Common.BackfillServiceCreateBackfillDryRunMutationResult,
+  TError = unknown,
+  TContext = unknown,
+>(
+  options?: Omit<
+    UseMutationOptions<
+      TData,
+      TError,
+      {
+        requestBody: BackfillPostBody;
+      },
+      TContext
+    >,
+    "mutationFn"
+  >,
+) =>
+  useMutation<
+    TData,
+    TError,
+    {
+      requestBody: BackfillPostBody;
+    },
+    TContext
+  >({
+    mutationFn: ({ requestBody }) =>
+      BackfillService.createBackfillDryRun({ requestBody }) as unknown as 
Promise<TData>,
+    ...options,
+  });
 /**
  * Post Connection
  * Create connection entry.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 64a8fcc9e81..443aef97a33 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2640,6 +2640,40 @@ This is the set of allowable values for the 
\`\`warning_type\`\` field
 in the DagWarning model.`,
 } as const;
 
+export const $DryRunBackfillCollectionResponse = {
+  properties: {
+    backfills: {
+      items: {
+        $ref: "#/components/schemas/DryRunBackfillResponse",
+      },
+      type: "array",
+      title: "Backfills",
+    },
+    total_entries: {
+      type: "integer",
+      title: "Total Entries",
+    },
+  },
+  type: "object",
+  required: ["backfills", "total_entries"],
+  title: "DryRunBackfillCollectionResponse",
+  description: "Backfill collection serializer for responses in dry-run mode.",
+} as const;
+
+export const $DryRunBackfillResponse = {
+  properties: {
+    logical_date: {
+      type: "string",
+      format: "date-time",
+      title: "Logical Date",
+    },
+  },
+  type: "object",
+  required: ["logical_date"],
+  title: "DryRunBackfillResponse",
+  description: "Backfill serializer for responses in dry-run mode.",
+} as const;
+
 export const $EdgeResponse = {
   properties: {
     is_setup_teardown: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 01666ae0904..c2a5a3ce6ec 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -54,6 +54,8 @@ import type {
   UnpauseBackfillResponse,
   CancelBackfillData,
   CancelBackfillResponse,
+  CreateBackfillDryRunData,
+  CreateBackfillDryRunResponse,
   GridDataData,
   GridDataResponse,
   DeleteConnectionData,
@@ -927,6 +929,31 @@ export class BackfillService {
       },
     });
   }
+
+  /**
+   * Create Backfill Dry Run
+   * @param data The data for the request.
+   * @param data.requestBody
+   * @returns DryRunBackfillCollectionResponse Successful Response
+   * @throws ApiError
+   */
+  public static createBackfillDryRun(
+    data: CreateBackfillDryRunData,
+  ): CancelablePromise<CreateBackfillDryRunResponse> {
+    return __request(OpenAPI, {
+      method: "POST",
+      url: "/public/backfills/dry_run",
+      body: data.requestBody,
+      mediaType: "application/json",
+      errors: {
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        409: "Conflict",
+        422: "Validation Error",
+      },
+    });
+  }
 }
 
 export class GridService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 232e66d2464..f2f709b02db 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -637,6 +637,21 @@ export type DagTagResponse = {
  */
 export type DagWarningType = "asset conflict" | "non-existent pool";
 
+/**
+ * Backfill collection serializer for responses in dry-run mode.
+ */
+export type DryRunBackfillCollectionResponse = {
+  backfills: Array<DryRunBackfillResponse>;
+  total_entries: number;
+};
+
+/**
+ * Backfill serializer for responses in dry-run mode.
+ */
+export type DryRunBackfillResponse = {
+  logical_date: string;
+};
+
 /**
  * Edge serializer for responses.
  */
@@ -1552,6 +1567,12 @@ export type CancelBackfillData = {
 
 export type CancelBackfillResponse = BackfillResponse;
 
+export type CreateBackfillDryRunData = {
+  requestBody: BackfillPostBody;
+};
+
+export type CreateBackfillDryRunResponse = DryRunBackfillCollectionResponse;
+
 export type GridDataData = {
   dagId: string;
   includeDownstream?: boolean;
@@ -2819,6 +2840,37 @@ export type $OpenApiTs = {
       };
     };
   };
+  "/public/backfills/dry_run": {
+    post: {
+      req: CreateBackfillDryRunData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: DryRunBackfillCollectionResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Conflict
+         */
+        409: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/ui/grid/{dag_id}": {
     get: {
       req: GridDataData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py 
b/tests/api_fastapi/core_api/routes/public/test_backfills.py
index 1c64b10848f..4f0ae7918e4 100644
--- a/tests/api_fastapi/core_api/routes/public/test_backfills.py
+++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py
@@ -192,6 +192,7 @@ class TestCreateBackfill(TestBackfillEndpoint):
             "max_active_runs": max_active_runs,
             "run_backwards": False,
             "dag_run_conf": {"param1": "val1", "param2": True},
+            "dry_run": False,
         }
         if repro_act is not None:
             data["reprocess_behavior"] = repro_act
@@ -215,6 +216,93 @@ class TestCreateBackfill(TestBackfillEndpoint):
         }
 
 
+class TestCreateBackfillDryRun(TestBackfillEndpoint):
+    @pytest.mark.parametrize(
+        "reprocess_behavior, expected_dates",
+        [
+            (
+                "none",
+                [
+                    {"logical_date": "2024-01-01T00:00:00Z"},
+                    {"logical_date": "2024-01-04T00:00:00Z"},
+                    {"logical_date": "2024-01-05T00:00:00Z"},
+                ],
+            ),
+            (
+                "failed",
+                [
+                    {"logical_date": "2024-01-01T00:00:00Z"},
+                    {"logical_date": "2024-01-03T00:00:00Z"},  # Reprocess 
failed
+                    {"logical_date": "2024-01-04T00:00:00Z"},
+                    {"logical_date": "2024-01-05T00:00:00Z"},
+                ],
+            ),
+            (
+                "completed",
+                [
+                    {"logical_date": "2024-01-01T00:00:00Z"},
+                    {"logical_date": "2024-01-02T00:00:00Z"},  # Reprocess all
+                    {"logical_date": "2024-01-03T00:00:00Z"},
+                    {"logical_date": "2024-01-04T00:00:00Z"},
+                    {"logical_date": "2024-01-05T00:00:00Z"},
+                ],
+            ),
+        ],
+    )
+    def test_create_backfill_dry_run(
+        self, session, dag_maker, test_client, reprocess_behavior, 
expected_dates
+    ):
+        with dag_maker(
+            session=session,
+            dag_id="TEST_DAG_2",
+            schedule="0 0 * * *",
+            start_date=pendulum.parse("2024-01-01"),
+        ) as dag:
+            EmptyOperator(task_id="mytask")
+
+        session.commit()
+
+        existing_dagruns = [
+            {"logical_date": pendulum.parse("2024-01-02"), "state": 
DagRunState.SUCCESS},  # Completed dag run
+            {"logical_date": pendulum.parse("2024-01-03"), "state": 
DagRunState.FAILED},  # Failed dag run
+        ]
+        for dagrun in existing_dagruns:
+            session.add(
+                DagRun(
+                    dag_id=dag.dag_id,
+                    run_id=f"manual__{dagrun['logical_date'].isoformat()}",
+                    logical_date=dagrun["logical_date"],
+                    state=dagrun["state"],
+                    run_type="scheduled",
+                )
+            )
+        session.commit()
+
+        from_date = pendulum.parse("2024-01-01")
+        from_date_iso = to_iso(from_date)
+        to_date = pendulum.parse("2024-01-05")
+        to_date_iso = to_iso(to_date)
+
+        data = {
+            "dag_id": dag.dag_id,
+            "from_date": from_date_iso,
+            "to_date": to_date_iso,
+            "max_active_runs": 5,
+            "run_backwards": False,
+            "dag_run_conf": {"param1": "val1", "param2": True},
+            "reprocess_behavior": reprocess_behavior,
+        }
+
+        response = test_client.post(
+            url="/public/backfills/dry_run",
+            json=data,
+        )
+
+        assert response.status_code == 200
+        response_json = response.json()
+        assert response_json["backfills"] == expected_dates
+
+
 class TestCancelBackfill(TestBackfillEndpoint):
     def test_cancel_backfill(self, session, test_client):
         (dag,) = self._create_dag_models()

Reply via email to