dstandish commented on code in PR #43496:
URL: https://github.com/apache/airflow/pull/43496#discussion_r1824850656


##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from fastapi import Depends, HTTPException
+from sqlalchemy import select, update
+from sqlalchemy.orm import Session
+from typing_extensions import Annotated
+
+from airflow.api_fastapi.common.db.common import get_session
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.openapi.exceptions import (
+    create_openapi_http_exception_doc,
+)
+from airflow.api_fastapi.core_api.serializers.backfills import (
+    BackfillCollectionResponse,
+    BackfillPostBody,
+    BackfillResponse,
+)
+from airflow.models import DagRun
+from airflow.models.backfill import (
+    AlreadyRunningBackfill,
+    Backfill,
+    BackfillDagRun,
+    _create_backfill,
+)
+from airflow.utils import timezone
+from airflow.utils.state import DagRunState
+
+backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills")
+
+
+@backfills_router.get(
+    path="/",
+    responses=create_openapi_http_exception_doc([401, 403]),
+)
+async def list_backfills(
+    dag_id: str,
+    session: Annotated[Session, Depends(get_session)],
+):
+    backfills = session.scalars(select(Backfill).where(Backfill.dag_id == 
dag_id)).all()
+    return BackfillCollectionResponse(
+        backfills=[BackfillResponse.model_validate(x, from_attributes=True) 
for x in backfills],
+        total_entries=len(backfills),
+    )
+
+
+@backfills_router.get(
+    path="/{backfill_id}",
+    responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_backfill(
+    backfill_id: str,
+    session: Annotated[Session, Depends(get_session)],
+):
+    backfill = session.get(Backfill, backfill_id)
+    if backfill:
+        return BackfillResponse.model_validate(backfill, from_attributes=True)
+    raise HTTPException(404, "Backfill not found")
+
+
+@backfills_router.put(
+    path="/{backfill_id}/pause",
+    responses=create_openapi_http_exception_doc([401, 403, 404, 409]),
+)
+async def pause_backfill(*, backfill_id, session: Annotated[Session, 
Depends(get_session)]):
+    b = session.get(Backfill, backfill_id)
+    if not b:
+        raise HTTPException(404, f"Could not find backfill with id 
{backfill_id}")
+    if b.completed_at:
+        raise HTTPException(409, "Backfill is already completed.")
+    if b.is_paused is False:
+        b.is_paused = True
+    session.commit()
+    return BackfillResponse.model_validate(b, from_attributes=True)
+
+
+@backfills_router.put(
+    path="/{backfill_id}/unpause",
+    responses=create_openapi_http_exception_doc([401, 403, 404, 409]),
+)
+async def unpause_backfill(*, backfill_id, session: Annotated[Session, 
Depends(get_session)]):
+    b = session.get(Backfill, backfill_id)
+    if not b:
+        raise HTTPException(404, f"Could not find backfill with id 
{backfill_id}")
+    if b.completed_at:
+        raise HTTPException(409, "Backfill is already completed.")
+    if b.is_paused:
+        b.is_paused = False
+    session.commit()
+    return BackfillResponse.model_validate(b, from_attributes=True)
+
+
+@backfills_router.put(
+    path="/{backfill_id}/cancel",
+    responses=create_openapi_http_exception_doc([401, 403, 404, 409]),
+)
+async def cancel_backfill(*, backfill_id, session: Annotated[Session, 
Depends(get_session)]):
+    b: Backfill = session.get(Backfill, backfill_id)
+    if not b:
+        raise HTTPException(404, f"Could not find backfill with id 
{backfill_id}")
+    if b.completed_at is not None:
+        raise HTTPException(409, "Backfill is already completed.")
+
+    # first, pause
+    if not b.is_paused:
+        b.is_paused = True
+
+    session.commit()

Review Comment:
   so i just pause first to ensure that no other runs are started.  if it blows 
up after this, nothing bad happens, but it's a partial success; what do you 
think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to