This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 32ec3d371b Add schemas for backfill endpoint (#42278)
32ec3d371b is described below
commit 32ec3d371baa0fb34a9260de3daec2c8c5376e55
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Sep 18 14:55:23 2024 -0700
Add schemas for backfill endpoint (#42278)
Part of AIP-78. Needed for endpoint.
---
airflow/api_connexion/schemas/backfill_schema.py | 78 ++++++++++++++++++++++
airflow/models/__init__.py | 2 -
.../api_connexion/schemas/test_backfill_schema.py | 54 +++++++++++++++
3 files changed, 132 insertions(+), 2 deletions(-)
diff --git a/airflow/api_connexion/schemas/backfill_schema.py
b/airflow/api_connexion/schemas/backfill_schema.py
new file mode 100644
index 0000000000..7f83d76df6
--- /dev/null
+++ b/airflow/api_connexion/schemas/backfill_schema.py
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import NamedTuple
+
+from marshmallow import Schema, fields
+from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
+
+from airflow.models.backfill import Backfill, BackfillDagRun
+
+
+class BackfillSchema(SQLAlchemySchema):
+ """Backfill Schema."""
+
+ class Meta:
+ """Meta."""
+
+ model = Backfill
+
+ id = auto_field(dump_only=True)
+ dag_id = auto_field(dump_only=True)
+ from_date = auto_field(dump_only=True)
+ to_date = auto_field(dump_only=True)
+ dag_run_conf = fields.Dict(allow_none=True)
+ is_paused = auto_field(dump_only=True)
+ max_active_runs = auto_field(dump_only=True)
+ created_at = auto_field(dump_only=True)
+ completed_at = auto_field(dump_only=True)
+ updated_at = auto_field(dump_only=True)
+
+
+class BackfillDagRunSchema(SQLAlchemySchema):
+ """Trigger Schema."""
+
+ class Meta:
+ """Meta."""
+
+ model = BackfillDagRun
+
+ id = auto_field(dump_only=True)
+ backfill_id = auto_field(dump_only=True)
+ dag_run_id = auto_field(dump_only=True)
+ sort_ordinal = auto_field(dump_only=True)
+
+
+class BackfillCollection(NamedTuple):
+ """List of Backfills with meta."""
+
+ backfills: list[Backfill]
+ total_entries: int
+
+
+class BackfillCollectionSchema(Schema):
+ """Backfill Collection Schema."""
+
+ backfills = fields.List(fields.Nested(BackfillSchema))
+ total_entries = fields.Int()
+
+
+backfill_schema = BackfillSchema()
+backfill_dag_run_schema = BackfillDagRunSchema()
+backfill_collection_schema = BackfillCollectionSchema()
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index f93abc46d1..7bf23e1bbb 100644
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -66,7 +66,6 @@ def import_all_models():
import airflow.models.taskinstancehistory
import airflow.models.tasklog
import airflow.providers.fab.auth_manager.models
- from airflow.models.backfill import Backfill, BackfillDagRun
def __getattr__(name):
@@ -118,7 +117,6 @@ __lazy_imports = {
if TYPE_CHECKING:
# I was unable to get mypy to respect a airflow/models/__init__.pyi, so
# having to resort back to this hacky method
- from airflow.models.backfill import Backfill, BackfillDagRun
from airflow.models.base import ID_LEN, Base
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
diff --git a/tests/api_connexion/schemas/test_backfill_schema.py
b/tests/api_connexion/schemas/test_backfill_schema.py
new file mode 100644
index 0000000000..c20714e968
--- /dev/null
+++ b/tests/api_connexion/schemas/test_backfill_schema.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.api_connexion.schemas.backfill_schema import BackfillCollection,
backfill_collection_schema
+from airflow.models.backfill import Backfill
+from airflow.utils import timezone
+
+
+class TestBackfillSchema:
+ def test_serialize_direct(self):
+ now = timezone.utcnow()
+ now_iso = now.isoformat()
+ b1 = Backfill(
+ dag_id="hi",
+ created_at=now,
+ completed_at=now,
+ from_date=now,
+ to_date=now,
+ updated_at=now,
+ )
+ bc = BackfillCollection(backfills=[b1], total_entries=1)
+ out = backfill_collection_schema.dump(bc)
+ assert out == {
+ "backfills": [
+ {
+ "completed_at": now_iso,
+ "created_at": now_iso,
+ "dag_id": "hi",
+ "dag_run_conf": None,
+ "from_date": now_iso,
+ "id": None,
+ "is_paused": None,
+ "max_active_runs": None,
+ "to_date": now_iso,
+ "updated_at": now_iso,
+ }
+ ],
+ "total_entries": 1,
+ }