This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 66d3df23914 Remove connections file in Execution API (#43761)
66d3df23914 is described below
commit 66d3df23914e938ed2258d54b571a41e5263923c
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Nov 6 17:18:54 2024 +0000
Remove connections file in Execution API (#43761)
Bad merge
---
.../execution_api/routes/connections.py | 131 ---------------------
.../execution_api/routes/task_instance.py | 4 +-
2 files changed, 2 insertions(+), 133 deletions(-)
diff --git a/airflow/api_fastapi/execution_api/routes/connections.py
b/airflow/api_fastapi/execution_api/routes/connections.py
deleted file mode 100644
index 655639f8fb2..00000000000
--- a/airflow/api_fastapi/execution_api/routes/connections.py
+++ /dev/null
@@ -1,131 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from __future__ import annotations
-
-import logging
-from uuid import UUID
-
-from fastapi import Body, Depends, HTTPException, status
-from sqlalchemy import update
-from sqlalchemy.exc import NoResultFound, SQLAlchemyError
-from sqlalchemy.orm import Session
-from sqlalchemy.sql import select
-from typing_extensions import Annotated
-
-from airflow.api_fastapi.common.db.common import get_session
-from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.execution_api import schemas
-from airflow.models.taskinstance import TaskInstance as TI
-from airflow.utils.state import State
-
-# TODO: Add dependency on JWT token
-ti_router = AirflowRouter(
- prefix="/connections",
- tags=["Task Instance"],
-)
-
-
-log = logging.getLogger(__name__)
-
-
-@ti_router.patch(
- "/{task_instance_id}/state",
- status_code=status.HTTP_204_NO_CONTENT,
- # TODO: Add Operation ID to control the function name in the OpenAPI spec
- # TODO: Do we need to use create_openapi_http_exception_doc here?
- responses={
- status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
- status.HTTP_409_CONFLICT: {"description": "The TI is already in the
requested state"},
- status.HTTP_422_UNPROCESSABLE_ENTITY: {"description": "Invalid payload
for the state transition"},
- },
-)
-async def ti_update_state(
- task_instance_id: UUID,
- ti_patch_payload: Annotated[schemas.TIStateUpdate, Body()],
- session: Annotated[Session, Depends(get_session)],
-):
- """
- Update the state of a TaskInstance.
-
- Not all state transitions are valid, and transitioning to some states
required extra information to be
- passed along. (Check our the schemas for details, the rendered docs might
not reflect this accurately)
- """
- # We only use UUID above for validation purposes
- ti_id_str = str(task_instance_id)
-
- old = select(TI.state).where(TI.id == ti_id_str).with_for_update()
- try:
- (previous_state,) = session.execute(old).one()
- except NoResultFound:
- log.error("Task Instance %s not found", ti_id_str)
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail={
- "reason": "not_found",
- "message": "Task Instance not found",
- },
- )
-
- # We exclude_unset to avoid updating fields that are not set in the payload
- data = ti_patch_payload.model_dump(exclude_unset=True)
-
- query = update(TI).where(TI.id == ti_id_str).values(data)
-
- if isinstance(ti_patch_payload, schemas.TIEnterRunningPayload):
- if previous_state != State.QUEUED:
- log.warning(
- "Can not start Task Instance ('%s') in invalid state: %s",
- ti_id_str,
- previous_state,
- )
-
- # TODO: Pass a RFC 9457 compliant error message in "detail" field
- # https://datatracker.ietf.org/doc/html/rfc9457
- # to provide more information about the error
- # FastAPI will automatically convert this to a JSON response
- # This might be added in FastAPI in
https://github.com/fastapi/fastapi/issues/10370
- raise HTTPException(
- status_code=status.HTTP_409_CONFLICT,
- detail={
- "reason": "invalid_state",
- "message": "TI was not in a state where it could be marked
as running",
- "previous_state": previous_state,
- },
- )
- log.info("Task with %s state started on %s ", previous_state,
ti_patch_payload.hostname)
- # Ensure there is no end date set.
- query = query.values(
- end_date=None,
- hostname=ti_patch_payload.hostname,
- unixname=ti_patch_payload.unixname,
- pid=ti_patch_payload.pid,
- state=State.RUNNING,
- )
- elif isinstance(ti_patch_payload, schemas.TITerminalStatePayload):
- query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
-
- # TODO: Replace this with FastAPI's Custom Exception handling:
- #
https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers
- try:
- result = session.execute(query)
- log.info("TI %s state updated: %s row(s) affected", ti_id_str,
result.rowcount)
- except SQLAlchemyError as e:
- log.error("Error updating Task Instance state: %s", e)
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Database error occurred"
- )
diff --git a/airflow/api_fastapi/execution_api/routes/task_instance.py
b/airflow/api_fastapi/execution_api/routes/task_instance.py
index cb952920e52..ddf4055d4d0 100644
--- a/airflow/api_fastapi/execution_api/routes/task_instance.py
+++ b/airflow/api_fastapi/execution_api/routes/task_instance.py
@@ -64,8 +64,8 @@ async def ti_update_state(
"""
Update the state of a TaskInstance.
- Not all state transitions are valid, and transitioning to some states
required extra information to be
- passed along. (Check our the schemas for details, the rendered docs might
not reflect this accurately)
+ Not all state transitions are valid, and transitioning to some states
requires extra information to be
+ passed along. (Check out the schemas for details, the rendered docs might
not reflect this accurately)
"""
# We only use UUID above for validation purposes
ti_id_str = str(task_instance_id)