This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b541f3c15 Add support for run conf to backfill (#42865)
2b541f3c15 is described below

commit 2b541f3c1513e1682816ad3a56b4250f3259c1c6
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Oct 10 08:16:01 2024 -0700

    Add support for run conf to backfill (#42865)
    
    This actually does a little bit more.  It changes the backfill create 
endpoint to take a json payload instead of just query params.  This is just 
easier because we can use the backfill schema as the schema of the request body.
    
    One thing that is maybe weird is I add a decorator to translate the request 
body to the kwargs in the endpoint function.  The main motivator here was for 
compatibility with the requires_access_dag decorator, which doesn't check 
request body.
---
 .../api_connexion/endpoints/backfill_endpoint.py   | 39 +++++++++++---
 airflow/api_connexion/openapi/v1.yaml              | 59 +++-------------------
 airflow/api_connexion/schemas/backfill_schema.py   | 17 ++++---
 airflow/models/backfill.py                         |  7 +--
 .../api_endpoints/test_backfill_endpoint.py        | 20 ++++----
 .../endpoints/test_backfill_endpoint.py            | 23 ++++-----
 tests/models/test_backfill.py                      |  4 +-
 7 files changed, 74 insertions(+), 95 deletions(-)

diff --git a/airflow/api_connexion/endpoints/backfill_endpoint.py 
b/airflow/api_connexion/endpoints/backfill_endpoint.py
index a0e728c5bc..94d6ad21f0 100644
--- a/airflow/api_connexion/endpoints/backfill_endpoint.py
+++ b/airflow/api_connexion/endpoints/backfill_endpoint.py
@@ -19,14 +19,14 @@ from __future__ import annotations
 
 import logging
 from functools import wraps
-from typing import TYPE_CHECKING, cast
+from typing import TYPE_CHECKING
 
-import pendulum
-from pendulum import DateTime
+from flask import request
+from marshmallow import ValidationError
 from sqlalchemy import select
 
 from airflow.api_connexion import security
-from airflow.api_connexion.exceptions import Conflict, NotFound
+from airflow.api_connexion.exceptions import BadRequest, Conflict, NotFound
 from airflow.api_connexion.schemas.backfill_schema import (
     BackfillCollection,
     backfill_collection_schema,
@@ -42,6 +42,8 @@ from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.www.decorators import action_logging
 
 if TYPE_CHECKING:
+    from datetime import datetime
+
     from sqlalchemy.orm import Session
 
     from airflow.api_connexion.types import APIResponse
@@ -119,12 +121,33 @@ def get_backfill(*, backfill_id: int, session: Session = 
NEW_SESSION, **kwargs):
     raise NotFound("Backfill not found")
 
 
+def backfill_obj_to_kwargs(f):
+    """
+    Convert the request body (containing backfill object json) to kwargs.
+
+    The main point here is to be compatible with the ``requires_access_dag`` 
decorator,
+    which takes dag_id kwarg and doesn't support json request body.
+    """
+
+    @wraps(f)
+    def inner():
+        body = request.json
+        try:
+            obj = backfill_schema.load(body)
+        except ValidationError as err:
+            raise BadRequest(detail=str(err.messages))
+        return f(**obj)
+
+    return inner
+
+
+@backfill_obj_to_kwargs
 @security.requires_access_dag("PUT")
 @action_logging
 def create_backfill(
     dag_id: str,
-    from_date: str,
-    to_date: str,
+    from_date: datetime,
+    to_date: datetime,
     max_active_runs: int = 10,
     reverse: bool = False,
     dag_run_conf: dict | None = None,
@@ -132,8 +155,8 @@ def create_backfill(
     try:
         backfill_obj = _create_backfill(
             dag_id=dag_id,
-            from_date=cast(DateTime, pendulum.parse(from_date)),
-            to_date=cast(DateTime, pendulum.parse(to_date)),
+            from_date=from_date,
+            to_date=to_date,
             max_active_runs=max_active_runs,
             reverse=reverse,
             dag_run_conf=dag_run_conf,
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index 305653e781..b39d1cd955 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -277,57 +277,12 @@ paths:
       x-openapi-router-controller: 
airflow.api_connexion.endpoints.backfill_endpoint
       operationId: create_backfill
       tags: [Backfill]
-      parameters:
-        - name: dag_id
-          in: query
-          schema:
-            type: string
-          required: true
-          description: |
-            Create dag runs for this dag.
-
-        - name: from_date
-          in: query
-          schema:
-            type: string
-            format: date-time
-          required: true
-          description: |
-            Create dag runs with logical dates from this date onward, 
including this date.
-
-        - name: to_date
-          in: query
-          schema:
-            type: string
-            format: date-time
-          required: true
-          description: |
-            Create dag runs for logical dates up to but not including this 
date.
-
-        - name: max_active_runs
-          in: query
-          schema:
-            type: integer
-          required: false
-          description: |
-            Maximum number of active DAG runs for the the backfill.
-
-        - name: reverse
-          in: query
-          schema:
-            type: boolean
-          required: false
-          description: |
-            If true, run the dag runs in descending order of logical date.
-
-        - name: config
-          in: query
-          schema:
-            # todo: AIP-78 make this object
-            type: string
-          required: false
-          description: |
-            If true, run the dag runs in descending order of logical date.
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: "#/components/schemas/Backfill"
       responses:
         "200":
           description: Success.
@@ -2920,7 +2875,7 @@ components:
           nullable: true
           description: To date of the backfill (exclusive).
         dag_run_conf:
-          type: string
+          type: object
           nullable: true
           description: Dag run conf to be forwarded to the dag runs.
         is_paused:
diff --git a/airflow/api_connexion/schemas/backfill_schema.py 
b/airflow/api_connexion/schemas/backfill_schema.py
index 7f83d76df6..db496bf1ac 100644
--- a/airflow/api_connexion/schemas/backfill_schema.py
+++ b/airflow/api_connexion/schemas/backfill_schema.py
@@ -34,15 +34,16 @@ class BackfillSchema(SQLAlchemySchema):
         model = Backfill
 
     id = auto_field(dump_only=True)
-    dag_id = auto_field(dump_only=True)
-    from_date = auto_field(dump_only=True)
-    to_date = auto_field(dump_only=True)
+    dag_id = auto_field()
+    from_date = auto_field()
+    to_date = auto_field()
     dag_run_conf = fields.Dict(allow_none=True)
-    is_paused = auto_field(dump_only=True)
-    max_active_runs = auto_field(dump_only=True)
-    created_at = auto_field(dump_only=True)
-    completed_at = auto_field(dump_only=True)
-    updated_at = auto_field(dump_only=True)
+    reverse = fields.Boolean()
+    is_paused = auto_field()
+    max_active_runs = auto_field()
+    created_at = auto_field()
+    completed_at = auto_field()
+    updated_at = auto_field()
 
 
 class BackfillDagRunSchema(SQLAlchemySchema):
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index 37683ee6f1..aa9cb695b7 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -41,7 +41,8 @@ from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 if TYPE_CHECKING:
-    from pendulum import DateTime
+    from datetime import datetime
+
 
 log = logging.getLogger(__name__)
 
@@ -121,8 +122,8 @@ class BackfillDagRun(Base):
 def _create_backfill(
     *,
     dag_id: str,
-    from_date: DateTime,
-    to_date: DateTime,
+    from_date: datetime,
+    to_date: datetime,
     max_active_runs: int,
     reverse: bool,
     dag_run_conf: dict | None,
diff --git 
a/providers/tests/fab/auth_manager/api_endpoints/test_backfill_endpoint.py 
b/providers/tests/fab/auth_manager/api_endpoints/test_backfill_endpoint.py
index 9d9a79af51..f8015ff907 100644
--- a/providers/tests/fab/auth_manager/api_endpoints/test_backfill_endpoint.py
+++ b/providers/tests/fab/auth_manager/api_endpoints/test_backfill_endpoint.py
@@ -19,7 +19,6 @@ from __future__ import annotations
 import os
 from datetime import datetime
 from unittest import mock
-from urllib.parse import urlencode
 
 import pendulum
 import pytest
@@ -197,21 +196,20 @@ class TestCreateBackfill(TestBackfillEndpoint):
         to_date = pendulum.parse("2024-02-01")
         to_date_iso = to_date.isoformat()
         max_active_runs = 5
-        query = urlencode(
-            query={
-                "dag_id": dag.dag_id,
-                "from_date": f"{from_date_iso}",
-                "to_date": f"{to_date_iso}",
-                "max_active_runs": max_active_runs,
-                "reverse": False,
-            }
-        )
+        data = {
+            "dag_id": dag.dag_id,
+            "from_date": f"{from_date_iso}",
+            "to_date": f"{to_date_iso}",
+            "max_active_runs": max_active_runs,
+            "reverse": False,
+        }
         kwargs = {}
         kwargs.update(environ_overrides={"REMOTE_USER": 
"test_granular_permissions"})
 
         response = self.client.post(
-            f"/api/v1/backfills?{query}",
+            "/api/v1/backfills",
             **kwargs,
+            json=data,
         )
         assert response.status_code == 200
         assert response.json == {
diff --git a/tests/api_connexion/endpoints/test_backfill_endpoint.py 
b/tests/api_connexion/endpoints/test_backfill_endpoint.py
index 67ec6316e2..5bb959caba 100644
--- a/tests/api_connexion/endpoints/test_backfill_endpoint.py
+++ b/tests/api_connexion/endpoints/test_backfill_endpoint.py
@@ -19,7 +19,6 @@ from __future__ import annotations
 import os
 from datetime import datetime
 from unittest import mock
-from urllib.parse import urlencode
 
 import pendulum
 import pytest
@@ -272,21 +271,21 @@ class TestCreateBackfill(TestBackfillEndpoint):
         to_date = pendulum.parse("2024-02-01")
         to_date_iso = to_date.isoformat()
         max_active_runs = 5
-        query = urlencode(
-            query={
-                "dag_id": dag.dag_id,
-                "from_date": f"{from_date_iso}",
-                "to_date": f"{to_date_iso}",
-                "max_active_runs": max_active_runs,
-                "reverse": False,
-            }
-        )
+        data = {
+            "dag_id": dag.dag_id,
+            "from_date": f"{from_date_iso}",
+            "to_date": f"{to_date_iso}",
+            "max_active_runs": max_active_runs,
+            "reverse": False,
+            "dag_run_conf": {"param1": "val1", "param2": True},
+        }
         kwargs = {}
         if user:
             kwargs.update(environ_overrides={"REMOTE_USER": user})
 
         response = self.client.post(
-            f"/api/v1/backfills?{query}",
+            "/api/v1/backfills",
+            json=data,
             **kwargs,
         )
         assert response.status_code == expected
@@ -295,7 +294,7 @@ class TestCreateBackfill(TestBackfillEndpoint):
                 "completed_at": mock.ANY,
                 "created_at": mock.ANY,
                 "dag_id": "TEST_DAG_1",
-                "dag_run_conf": None,
+                "dag_run_conf": {"param1": "val1", "param2": True},
                 "from_date": from_date_iso,
                 "id": mock.ANY,
                 "is_paused": False,
diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py
index 0f471fbd56..06c41cadd8 100644
--- a/tests/models/test_backfill.py
+++ b/tests/models/test_backfill.py
@@ -93,13 +93,14 @@ def test_create_backfill_simple(reverse, dag_maker, 
session):
     """
     with dag_maker(schedule="@daily") as dag:
         PythonOperator(task_id="hi", python_callable=print)
+    expected_run_conf = {"param1": "valABC"}
     b = _create_backfill(
         dag_id=dag.dag_id,
         from_date=pendulum.parse("2021-01-01"),
         to_date=pendulum.parse("2021-01-05"),
         max_active_runs=2,
         reverse=reverse,
-        dag_run_conf={},
+        dag_run_conf=expected_run_conf,
     )
     query = (
         select(DagRun)
@@ -114,6 +115,7 @@ def test_create_backfill_simple(reverse, dag_maker, 
session):
         expected_dates = list(reversed(expected_dates))
     assert dates == expected_dates
     assert all(x.state == DagRunState.QUEUED for x in dag_runs)
+    assert all(x.conf == expected_run_conf for x in dag_runs)
 
 
 def test_params_stored_correctly(dag_maker, session):

Reply via email to