This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2b541f3c15 Add support for run conf to backfill (#42865)
2b541f3c15 is described below
commit 2b541f3c1513e1682816ad3a56b4250f3259c1c6
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Oct 10 08:16:01 2024 -0700
Add support for run conf to backfill (#42865)
This actually does a little bit more. It changes the backfill create
endpoint to take a json payload instead of just query params. This is just
easier because we can use the backfill schema as the schema of the request body.
One thing that is maybe weird is I add a decorator to translate the request
body to the kwargs in the endpoint function. The main motivator here was for
compatibility with the requires_access_dag decorator, which doesn't check
request body.
---
.../api_connexion/endpoints/backfill_endpoint.py | 39 +++++++++++---
airflow/api_connexion/openapi/v1.yaml | 59 +++-------------------
airflow/api_connexion/schemas/backfill_schema.py | 17 ++++---
airflow/models/backfill.py | 7 +--
.../api_endpoints/test_backfill_endpoint.py | 20 ++++----
.../endpoints/test_backfill_endpoint.py | 23 ++++-----
tests/models/test_backfill.py | 4 +-
7 files changed, 74 insertions(+), 95 deletions(-)
diff --git a/airflow/api_connexion/endpoints/backfill_endpoint.py
b/airflow/api_connexion/endpoints/backfill_endpoint.py
index a0e728c5bc..94d6ad21f0 100644
--- a/airflow/api_connexion/endpoints/backfill_endpoint.py
+++ b/airflow/api_connexion/endpoints/backfill_endpoint.py
@@ -19,14 +19,14 @@ from __future__ import annotations
import logging
from functools import wraps
-from typing import TYPE_CHECKING, cast
+from typing import TYPE_CHECKING
-import pendulum
-from pendulum import DateTime
+from flask import request
+from marshmallow import ValidationError
from sqlalchemy import select
from airflow.api_connexion import security
-from airflow.api_connexion.exceptions import Conflict, NotFound
+from airflow.api_connexion.exceptions import BadRequest, Conflict, NotFound
from airflow.api_connexion.schemas.backfill_schema import (
BackfillCollection,
backfill_collection_schema,
@@ -42,6 +42,8 @@ from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.decorators import action_logging
if TYPE_CHECKING:
+ from datetime import datetime
+
from sqlalchemy.orm import Session
from airflow.api_connexion.types import APIResponse
@@ -119,12 +121,33 @@ def get_backfill(*, backfill_id: int, session: Session =
NEW_SESSION, **kwargs):
raise NotFound("Backfill not found")
+def backfill_obj_to_kwargs(f):
+ """
+ Convert the request body (containing backfill object json) to kwargs.
+
+ The main point here is to be compatible with the ``requires_access_dag``
decorator,
+ which takes dag_id kwarg and doesn't support json request body.
+ """
+
+ @wraps(f)
+ def inner():
+ body = request.json
+ try:
+ obj = backfill_schema.load(body)
+ except ValidationError as err:
+ raise BadRequest(detail=str(err.messages))
+ return f(**obj)
+
+ return inner
+
+
+@backfill_obj_to_kwargs
@security.requires_access_dag("PUT")
@action_logging
def create_backfill(
dag_id: str,
- from_date: str,
- to_date: str,
+ from_date: datetime,
+ to_date: datetime,
max_active_runs: int = 10,
reverse: bool = False,
dag_run_conf: dict | None = None,
@@ -132,8 +155,8 @@ def create_backfill(
try:
backfill_obj = _create_backfill(
dag_id=dag_id,
- from_date=cast(DateTime, pendulum.parse(from_date)),
- to_date=cast(DateTime, pendulum.parse(to_date)),
+ from_date=from_date,
+ to_date=to_date,
max_active_runs=max_active_runs,
reverse=reverse,
dag_run_conf=dag_run_conf,
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index 305653e781..b39d1cd955 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -277,57 +277,12 @@ paths:
x-openapi-router-controller:
airflow.api_connexion.endpoints.backfill_endpoint
operationId: create_backfill
tags: [Backfill]
- parameters:
- - name: dag_id
- in: query
- schema:
- type: string
- required: true
- description: |
- Create dag runs for this dag.
-
- - name: from_date
- in: query
- schema:
- type: string
- format: date-time
- required: true
- description: |
- Create dag runs with logical dates from this date onward,
including this date.
-
- - name: to_date
- in: query
- schema:
- type: string
- format: date-time
- required: true
- description: |
- Create dag runs for logical dates up to but not including this
date.
-
- - name: max_active_runs
- in: query
- schema:
- type: integer
- required: false
- description: |
- Maximum number of active DAG runs for the the backfill.
-
- - name: reverse
- in: query
- schema:
- type: boolean
- required: false
- description: |
- If true, run the dag runs in descending order of logical date.
-
- - name: config
- in: query
- schema:
- # todo: AIP-78 make this object
- type: string
- required: false
- description: |
- If true, run the dag runs in descending order of logical date.
+ requestBody:
+ required: true
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/Backfill"
responses:
"200":
description: Success.
@@ -2920,7 +2875,7 @@ components:
nullable: true
description: To date of the backfill (exclusive).
dag_run_conf:
- type: string
+ type: object
nullable: true
description: Dag run conf to be forwarded to the dag runs.
is_paused:
diff --git a/airflow/api_connexion/schemas/backfill_schema.py
b/airflow/api_connexion/schemas/backfill_schema.py
index 7f83d76df6..db496bf1ac 100644
--- a/airflow/api_connexion/schemas/backfill_schema.py
+++ b/airflow/api_connexion/schemas/backfill_schema.py
@@ -34,15 +34,16 @@ class BackfillSchema(SQLAlchemySchema):
model = Backfill
id = auto_field(dump_only=True)
- dag_id = auto_field(dump_only=True)
- from_date = auto_field(dump_only=True)
- to_date = auto_field(dump_only=True)
+ dag_id = auto_field()
+ from_date = auto_field()
+ to_date = auto_field()
dag_run_conf = fields.Dict(allow_none=True)
- is_paused = auto_field(dump_only=True)
- max_active_runs = auto_field(dump_only=True)
- created_at = auto_field(dump_only=True)
- completed_at = auto_field(dump_only=True)
- updated_at = auto_field(dump_only=True)
+ reverse = fields.Boolean()
+ is_paused = auto_field()
+ max_active_runs = auto_field()
+ created_at = auto_field()
+ completed_at = auto_field()
+ updated_at = auto_field()
class BackfillDagRunSchema(SQLAlchemySchema):
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index 37683ee6f1..aa9cb695b7 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -41,7 +41,8 @@ from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
if TYPE_CHECKING:
- from pendulum import DateTime
+ from datetime import datetime
+
log = logging.getLogger(__name__)
@@ -121,8 +122,8 @@ class BackfillDagRun(Base):
def _create_backfill(
*,
dag_id: str,
- from_date: DateTime,
- to_date: DateTime,
+ from_date: datetime,
+ to_date: datetime,
max_active_runs: int,
reverse: bool,
dag_run_conf: dict | None,
diff --git
a/providers/tests/fab/auth_manager/api_endpoints/test_backfill_endpoint.py
b/providers/tests/fab/auth_manager/api_endpoints/test_backfill_endpoint.py
index 9d9a79af51..f8015ff907 100644
--- a/providers/tests/fab/auth_manager/api_endpoints/test_backfill_endpoint.py
+++ b/providers/tests/fab/auth_manager/api_endpoints/test_backfill_endpoint.py
@@ -19,7 +19,6 @@ from __future__ import annotations
import os
from datetime import datetime
from unittest import mock
-from urllib.parse import urlencode
import pendulum
import pytest
@@ -197,21 +196,20 @@ class TestCreateBackfill(TestBackfillEndpoint):
to_date = pendulum.parse("2024-02-01")
to_date_iso = to_date.isoformat()
max_active_runs = 5
- query = urlencode(
- query={
- "dag_id": dag.dag_id,
- "from_date": f"{from_date_iso}",
- "to_date": f"{to_date_iso}",
- "max_active_runs": max_active_runs,
- "reverse": False,
- }
- )
+ data = {
+ "dag_id": dag.dag_id,
+ "from_date": f"{from_date_iso}",
+ "to_date": f"{to_date_iso}",
+ "max_active_runs": max_active_runs,
+ "reverse": False,
+ }
kwargs = {}
kwargs.update(environ_overrides={"REMOTE_USER":
"test_granular_permissions"})
response = self.client.post(
- f"/api/v1/backfills?{query}",
+ "/api/v1/backfills",
**kwargs,
+ json=data,
)
assert response.status_code == 200
assert response.json == {
diff --git a/tests/api_connexion/endpoints/test_backfill_endpoint.py
b/tests/api_connexion/endpoints/test_backfill_endpoint.py
index 67ec6316e2..5bb959caba 100644
--- a/tests/api_connexion/endpoints/test_backfill_endpoint.py
+++ b/tests/api_connexion/endpoints/test_backfill_endpoint.py
@@ -19,7 +19,6 @@ from __future__ import annotations
import os
from datetime import datetime
from unittest import mock
-from urllib.parse import urlencode
import pendulum
import pytest
@@ -272,21 +271,21 @@ class TestCreateBackfill(TestBackfillEndpoint):
to_date = pendulum.parse("2024-02-01")
to_date_iso = to_date.isoformat()
max_active_runs = 5
- query = urlencode(
- query={
- "dag_id": dag.dag_id,
- "from_date": f"{from_date_iso}",
- "to_date": f"{to_date_iso}",
- "max_active_runs": max_active_runs,
- "reverse": False,
- }
- )
+ data = {
+ "dag_id": dag.dag_id,
+ "from_date": f"{from_date_iso}",
+ "to_date": f"{to_date_iso}",
+ "max_active_runs": max_active_runs,
+ "reverse": False,
+ "dag_run_conf": {"param1": "val1", "param2": True},
+ }
kwargs = {}
if user:
kwargs.update(environ_overrides={"REMOTE_USER": user})
response = self.client.post(
- f"/api/v1/backfills?{query}",
+ "/api/v1/backfills",
+ json=data,
**kwargs,
)
assert response.status_code == expected
@@ -295,7 +294,7 @@ class TestCreateBackfill(TestBackfillEndpoint):
"completed_at": mock.ANY,
"created_at": mock.ANY,
"dag_id": "TEST_DAG_1",
- "dag_run_conf": None,
+ "dag_run_conf": {"param1": "val1", "param2": True},
"from_date": from_date_iso,
"id": mock.ANY,
"is_paused": False,
diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py
index 0f471fbd56..06c41cadd8 100644
--- a/tests/models/test_backfill.py
+++ b/tests/models/test_backfill.py
@@ -93,13 +93,14 @@ def test_create_backfill_simple(reverse, dag_maker,
session):
"""
with dag_maker(schedule="@daily") as dag:
PythonOperator(task_id="hi", python_callable=print)
+ expected_run_conf = {"param1": "valABC"}
b = _create_backfill(
dag_id=dag.dag_id,
from_date=pendulum.parse("2021-01-01"),
to_date=pendulum.parse("2021-01-05"),
max_active_runs=2,
reverse=reverse,
- dag_run_conf={},
+ dag_run_conf=expected_run_conf,
)
query = (
select(DagRun)
@@ -114,6 +115,7 @@ def test_create_backfill_simple(reverse, dag_maker,
session):
expected_dates = list(reversed(expected_dates))
assert dates == expected_dates
assert all(x.state == DagRunState.QUEUED for x in dag_runs)
+ assert all(x.conf == expected_run_conf for x in dag_runs)
def test_params_stored_correctly(dag_maker, session):