pierrejeambrun commented on code in PR #43496: URL: https://github.com/apache/airflow/pull/43496#discussion_r1824069814
########## airflow/api_fastapi/core_api/routes/public/backfills.py: ########## @@ -0,0 +1,172 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from fastapi import Depends, HTTPException +from sqlalchemy import select, update +from sqlalchemy.orm import Session +from typing_extensions import Annotated + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.openapi.exceptions import ( + create_openapi_http_exception_doc, +) +from airflow.api_fastapi.core_api.serializers.backfills import ( + BackfillCollectionResponse, + BackfillPostBody, + BackfillResponse, +) +from airflow.models import DagRun +from airflow.models.backfill import ( + AlreadyRunningBackfill, + Backfill, + BackfillDagRun, + _create_backfill, +) +from airflow.utils import timezone +from airflow.utils.state import DagRunState + +backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills") + + +@backfills_router.get( + path="/", + responses=create_openapi_http_exception_doc([401, 403]), +) +async def list_backfills( + dag_id: str, + session: Annotated[Session, Depends(get_session)], +): + backfills = session.scalars(select(Backfill).where(Backfill.dag_id == dag_id)).all() + return BackfillCollectionResponse( + backfills=[BackfillResponse.model_validate(x, from_attributes=True) for x in backfills], + total_entries=len(backfills), + ) + + +@backfills_router.get( + path="/{backfill_id}", + responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_backfill( + backfill_id: str, + session: Annotated[Session, Depends(get_session)], +): + backfill = session.get(Backfill, backfill_id) + if backfill: + return BackfillResponse.model_validate(backfill, from_attributes=True) + raise HTTPException(404, "Backfill not found") + + +@backfills_router.put( + path="/{backfill_id}/pause", + responses=create_openapi_http_exception_doc([401, 403, 404, 409]), +) +async def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]): + b = session.get(Backfill, backfill_id) + if not b: + raise HTTPException(404, f"Could not find backfill with id {backfill_id}") + if b.completed_at: + raise HTTPException(409, "Backfill is already completed.") + if b.is_paused is False: + b.is_paused = True + session.commit() + return BackfillResponse.model_validate(b, from_attributes=True) + + +@backfills_router.put( + path="/{backfill_id}/unpause", + responses=create_openapi_http_exception_doc([401, 403, 404, 409]), +) +async def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]): + b = session.get(Backfill, backfill_id) + if not b: + raise HTTPException(404, f"Could not find backfill with id {backfill_id}") + if b.completed_at: + raise HTTPException(409, "Backfill is already completed.") + if b.is_paused: + b.is_paused = False + session.commit() Review Comment: The dependency should commit on exit of the context manager, I don't think we need to manually commit. ########## airflow/api_fastapi/core_api/routes/public/backfills.py: ########## @@ -0,0 +1,172 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from fastapi import Depends, HTTPException +from sqlalchemy import select, update +from sqlalchemy.orm import Session +from typing_extensions import Annotated + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.openapi.exceptions import ( + create_openapi_http_exception_doc, +) +from airflow.api_fastapi.core_api.serializers.backfills import ( + BackfillCollectionResponse, + BackfillPostBody, + BackfillResponse, +) +from airflow.models import DagRun +from airflow.models.backfill import ( + AlreadyRunningBackfill, + Backfill, + BackfillDagRun, + _create_backfill, +) +from airflow.utils import timezone +from airflow.utils.state import DagRunState + +backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills") + + +@backfills_router.get( + path="/", + responses=create_openapi_http_exception_doc([401, 403]), +) +async def list_backfills( + dag_id: str, + session: Annotated[Session, Depends(get_session)], Review Comment: Do we want pagination like other `list` endpoints ? ########## tests/api_fastapi/core_api/routes/public/test_backfills.py: ########## @@ -92,22 +80,17 @@ def configured_app(minimal_app_for_api): dag_bag = DagBag(os.devnull, include_examples=False) dag_bag.dags = {dag.dag_id: dag, dag2.dag_id: dag2, dag3.dag_id: dag3} - app.dag_bag = dag_bag - yield app - - delete_user(app, username="test") - delete_user(app, username="test_no_permissions") +def to_iso(val): + return pendulum.instance(val).to_iso8601_string() class TestBackfillEndpoint: - @pytest.fixture(autouse=True) - def setup_attrs(self, configured_app) -> None: - self.app = configured_app - self.client = self.app.test_client() # type:ignore - self.dag_id = DAG_ID - self.dag2_id = DAG2_ID - self.dag3_id = DAG3_ID + # @pytest.fixture(autouse=True) + # def setup_attrs(self): + # self.dag_id = DAG_ID + # self.dag2_id = DAG2_ID + # self.dag3_id = DAG3_ID Review Comment: Can we remove that if not needed ? ########## airflow/api_fastapi/core_api/serializers/backfills.py: ########## @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import datetime +from typing import Callable + +from pydantic import BaseModel + +from airflow.models.backfill import ReprocessBehavior + + +def _call_function(function: Callable[[], int]) -> int: + """ + Call the given function. + + Used for the BeforeValidator to get the actual values from the bound method. + """ + return function() + Review Comment: I don't think we need this ########## airflow/api_fastapi/core_api/routes/public/backfills.py: ########## @@ -0,0 +1,172 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from fastapi import Depends, HTTPException +from sqlalchemy import select, update +from sqlalchemy.orm import Session +from typing_extensions import Annotated + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.openapi.exceptions import ( + create_openapi_http_exception_doc, +) +from airflow.api_fastapi.core_api.serializers.backfills import ( + BackfillCollectionResponse, + BackfillPostBody, + BackfillResponse, +) +from airflow.models import DagRun +from airflow.models.backfill import ( + AlreadyRunningBackfill, + Backfill, + BackfillDagRun, + _create_backfill, +) +from airflow.utils import timezone +from airflow.utils.state import DagRunState + +backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills") + + +@backfills_router.get( + path="/", + responses=create_openapi_http_exception_doc([401, 403]), +) +async def list_backfills( + dag_id: str, + session: Annotated[Session, Depends(get_session)], +): + backfills = session.scalars(select(Backfill).where(Backfill.dag_id == dag_id)).all() + return BackfillCollectionResponse( + backfills=[BackfillResponse.model_validate(x, from_attributes=True) for x in backfills], + total_entries=len(backfills), + ) + + +@backfills_router.get( + path="/{backfill_id}", + responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_backfill( + backfill_id: str, + session: Annotated[Session, Depends(get_session)], +): + backfill = session.get(Backfill, backfill_id) + if backfill: + return BackfillResponse.model_validate(backfill, from_attributes=True) + raise HTTPException(404, "Backfill not found") + + +@backfills_router.put( + path="/{backfill_id}/pause", + responses=create_openapi_http_exception_doc([401, 403, 404, 409]), +) +async def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]): + b = session.get(Backfill, backfill_id) + if not b: + raise HTTPException(404, f"Could not find backfill with id {backfill_id}") + if b.completed_at: + raise HTTPException(409, "Backfill is already completed.") + if b.is_paused is False: + b.is_paused = True + session.commit() + return BackfillResponse.model_validate(b, from_attributes=True) + + +@backfills_router.put( + path="/{backfill_id}/unpause", + responses=create_openapi_http_exception_doc([401, 403, 404, 409]), +) +async def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]): + b = session.get(Backfill, backfill_id) + if not b: + raise HTTPException(404, f"Could not find backfill with id {backfill_id}") + if b.completed_at: + raise HTTPException(409, "Backfill is already completed.") + if b.is_paused: + b.is_paused = False + session.commit() + return BackfillResponse.model_validate(b, from_attributes=True) + + +@backfills_router.put( + path="/{backfill_id}/cancel", + responses=create_openapi_http_exception_doc([401, 403, 404, 409]), +) +async def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]): + b: Backfill = session.get(Backfill, backfill_id) + if not b: + raise HTTPException(404, f"Could not find backfill with id {backfill_id}") + if b.completed_at is not None: + raise HTTPException(409, "Backfill is already completed.") + + # first, pause + if not b.is_paused: + b.is_paused = True + + session.commit() Review Comment: Intermediary `session.commit` should be replaced by `session.refresh` or nothing because if we have multiple commit in the same request, we lose atomicity. (If the code fails after that commit the database is in an inconsistent state). Usually 1 request -> 1 commit on exit. We can use `refresh` to retrieve db side auto generated ids and such. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
