This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new d396ed9d364 [v3-0-test] Execution API: Improve task instance logging 
with structlog context (#50120) (#50131)
d396ed9d364 is described below

commit d396ed9d36432bc9deff2687977000e0c8e35652
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat May 3 00:16:51 2025 +0530

    [v3-0-test] Execution API: Improve task instance logging with structlog 
context (#50120) (#50131)
    
    - Added `ti_id` to all log contexts using structlog's `bind_contextvars`
    - Remove redundant ti_id from individual log messages
    - Add more detailed debug logs for important operations
    (cherry picked from commit c9b2dca1c5150c8e767470699d9c1b751817ddd9)
    
    Co-authored-by: Kaxil Naik <[email protected]>
---
 .../execution_api/routes/task_instances.py         | 91 +++++++++++++++++-----
 1 file changed, 71 insertions(+), 20 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 9b0e2dc7a27..f97a684e3ff 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -18,11 +18,11 @@
 from __future__ import annotations
 
 import json
-import logging
 from collections import defaultdict
 from typing import Annotated, Any
 from uuid import UUID
 
+import structlog
 from cadwyn import VersionedAPIRouter
 from fastapi import Body, Depends, HTTPException, Query, Request, status
 from pydantic import JsonValue
@@ -30,6 +30,7 @@ from sqlalchemy import func, or_, tuple_, update
 from sqlalchemy.exc import NoResultFound, SQLAlchemyError
 from sqlalchemy.orm import joinedload
 from sqlalchemy.sql import select
+from structlog.contextvars import bind_contextvars
 
 from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.common.types import UtcDateTime
@@ -67,7 +68,7 @@ ti_id_router = VersionedAPIRouter(
 )
 
 
-log = logging.getLogger(__name__)
+log = structlog.get_logger(__name__)
 
 
 @ti_id_router.patch(
@@ -90,6 +91,13 @@ def ti_run(
     """
     # We only use UUID above for validation purposes
     ti_id_str = str(task_instance_id)
+    bind_contextvars(ti_id=ti_id_str)
+    log.debug(
+        "Starting task instance run",
+        hostname=ti_run_payload.hostname,
+        unixname=ti_run_payload.unixname,
+        pid=ti_run_payload.pid,
+    )
 
     from sqlalchemy.sql import column
     from sqlalchemy.types import JSON
@@ -118,8 +126,9 @@ def ti_run(
     )
     try:
         ti = session.execute(old).one()
+        log.debug("Retrieved task instance details", state=ti.state, 
dag_id=ti.dag_id, task_id=ti.task_id)
     except NoResultFound:
-        log.error("Task Instance %s not found", ti_id_str)
+        log.error("Task Instance not found")
         raise HTTPException(
             status_code=status.HTTP_404_NOT_FOUND,
             detail={
@@ -134,6 +143,7 @@ def ti_run(
     # don't update start date when resuming from deferral
     if ti.next_kwargs:
         data.pop("start_date")
+        log.debug("Removed start_date from update as task is resuming from 
deferral")
 
     query = update(TI).where(TI.id == ti_id_str).values(data)
 
@@ -146,12 +156,11 @@ def ti_run(
         ti_run_payload.unixname,
         ti_run_payload.pid,
     ):
-        log.info("Duplicate start request received from %s ", 
ti_run_payload.hostname)
+        log.info("Duplicate start request received", 
hostname=ti_run_payload.hostname)
     elif previous_state not in (TaskInstanceState.QUEUED, 
TaskInstanceState.RESTARTING):
         log.warning(
-            "Can not start Task Instance ('%s') in invalid state: %s",
-            ti_id_str,
-            previous_state,
+            "Cannot start Task Instance in invalid state",
+            previous_state=previous_state,
         )
 
         # TODO: Pass a RFC 9457 compliant error message in "detail" field
@@ -168,7 +177,7 @@ def ti_run(
             },
         )
     else:
-        log.info("Task with %s state started on %s ", previous_state, 
ti_run_payload.hostname)
+        log.info("Task started", previous_state=previous_state, 
hostname=ti_run_payload.hostname)
     # Ensure there is no end date set.
     query = query.values(
         end_date=None,
@@ -181,7 +190,7 @@ def ti_run(
 
     try:
         result = session.execute(query)
-        log.info("TI %s state updated: %s row(s) affected", ti_id_str, 
result.rowcount)
+        log.info("Task instance state updated", rows_affected=result.rowcount)
 
         dr = (
             session.scalars(
@@ -194,6 +203,7 @@ def ti_run(
         )
 
         if not dr:
+            log.error("DagRun not found", dag_id=ti.dag_id, run_id=ti.run_id)
             raise ValueError(f"DagRun with dag_id={ti.dag_id} and 
run_id={ti.run_id} not found.")
 
         # Send the keys to the SDK so that the client requests to clear those 
XComs from the server.
@@ -268,10 +278,12 @@ def ti_update_state(
     Not all state transitions are valid, and transitioning to some states 
requires extra information to be
     passed along. (Check out the datamodels for details, the rendered docs 
might not reflect this accurately)
     """
-    updated_state: str = ""
-
     # We only use UUID above for validation purposes
     ti_id_str = str(task_instance_id)
+    bind_contextvars(ti_id=ti_id_str)
+    log.debug("Updating task instance state", new_state=ti_patch_payload.state)
+
+    updated_state: str = ""
 
     old = select(TI.state, TI.try_number, TI.max_tries, TI.dag_id).where(TI.id 
== ti_id_str).with_for_update()
     try:
@@ -281,8 +293,14 @@ def ti_update_state(
             max_tries,
             dag_id,
         ) = session.execute(old).one()
+        log.debug(
+            "Retrieved current task instance state",
+            previous_state=previous_state,
+            try_number=try_number,
+            max_tries=max_tries,
+        )
     except NoResultFound:
-        log.error("Task Instance %s not found", ti_id_str)
+        log.error("Task Instance not found")
         raise HTTPException(
             status_code=status.HTTP_404_NOT_FOUND,
             detail={
@@ -293,9 +311,8 @@ def ti_update_state(
 
     if previous_state != TaskInstanceState.RUNNING:
         log.warning(
-            "Cannot update Task Instance ('%s') because it is in an invalid 
state: %s for an update",
-            ti_id_str,
-            previous_state,
+            "Cannot update Task Instance in invalid state",
+            previous_state=previous_state,
         )
         raise HTTPException(
             status_code=status.HTTP_409_CONFLICT,
@@ -421,9 +438,9 @@ def ti_update_state(
     # 
https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers
     try:
         result = session.execute(query)
-        log.info("TI %s state updated to %s: %s row(s) affected", ti_id_str, 
updated_state, result.rowcount)
+        log.info("Task instance state updated", new_state=updated_state, 
rows_affected=result.rowcount)
     except SQLAlchemyError as e:
-        log.error("Error updating Task Instance state: %s", e)
+        log.error("Error updating Task Instance state", error=str(e))
         raise HTTPException(
             status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 
detail="Database error occurred"
         )
@@ -443,12 +460,17 @@ def ti_skip_downstream(
     session: SessionDep,
 ):
     ti_id_str = str(task_instance_id)
+    bind_contextvars(ti_id=ti_id_str)
+    log.info("Skipping downstream tasks", 
task_count=len(ti_patch_payload.tasks))
+
     now = timezone.utcnow()
     tasks = ti_patch_payload.tasks
 
     dag_id, run_id = session.execute(select(TI.dag_id, TI.run_id).where(TI.id 
== ti_id_str)).fetchone()
+    log.debug("Retrieved DAG and run info", dag_id=dag_id, run_id=run_id)
 
     task_ids = [task if isinstance(task, tuple) else (task, -1) for task in 
tasks]
+    log.debug("Prepared task IDs for skipping", task_ids=task_ids)
 
     query = (
         update(TI)
@@ -458,7 +480,7 @@ def ti_skip_downstream(
     )
 
     result = session.execute(query)
-    log.info("TI %s updated the state of %s task(s) to skipped", ti_id_str, 
result.rowcount)
+    log.info("Downstream tasks skipped", tasks_skipped=result.rowcount)
 
 
 @ti_id_router.put(
@@ -479,6 +501,8 @@ def ti_heartbeat(
 ):
     """Update the heartbeat of a TaskInstance to mark it as alive & still 
running."""
     ti_id_str = str(task_instance_id)
+    bind_contextvars(ti_id=ti_id_str)
+    log.debug("Processing heartbeat", hostname=ti_payload.hostname, 
pid=ti_payload.pid)
 
     # Hot path: since heartbeating a task is a very common operation, we try 
to do minimize the number of queries
     # and DB round trips as much as possible.
@@ -487,8 +511,11 @@ def ti_heartbeat(
 
     try:
         (previous_state, hostname, pid) = session.execute(old).one()
+        log.debug(
+            "Retrieved current task state", state=previous_state, 
current_hostname=hostname, current_pid=pid
+        )
     except NoResultFound:
-        log.error("Task Instance %s not found", ti_id_str)
+        log.error("Task Instance not found")
         raise HTTPException(
             status_code=status.HTTP_404_NOT_FOUND,
             detail={
@@ -498,6 +525,13 @@ def ti_heartbeat(
         )
 
     if hostname != ti_payload.hostname or pid != ti_payload.pid:
+        log.warning(
+            "Task running elsewhere",
+            current_hostname=hostname,
+            current_pid=pid,
+            requested_hostname=ti_payload.hostname,
+            requested_pid=ti_payload.pid,
+        )
         raise HTTPException(
             status_code=status.HTTP_409_CONFLICT,
             detail={
@@ -509,6 +543,7 @@ def ti_heartbeat(
         )
 
     if previous_state != TaskInstanceState.RUNNING:
+        log.warning("Task not in running state", current_state=previous_state)
         raise HTTPException(
             status_code=status.HTTP_409_CONFLICT,
             detail={
@@ -520,7 +555,7 @@ def ti_heartbeat(
 
     # Update the last heartbeat time!
     session.execute(update(TI).where(TI.id == 
ti_id_str).values(last_heartbeat_at=timezone.utcnow()))
-    log.debug("Task with %s state heartbeated", previous_state)
+    log.debug("Heartbeat updated", state=previous_state)
 
 
 @ti_id_router.put(
@@ -543,12 +578,17 @@ def ti_put_rtif(
 ):
     """Add an RTIF entry for a task instance, sent by the worker."""
     ti_id_str = str(task_instance_id)
+    bind_contextvars(ti_id=ti_id_str)
+    log.info("Updating RenderedTaskInstanceFields", 
field_count=len(put_rtif_payload))
+
     task_instance = session.scalar(select(TI).where(TI.id == ti_id_str))
     if not task_instance:
+        log.error("Task Instance not found")
         raise HTTPException(
             status_code=status.HTTP_404_NOT_FOUND,
         )
     task_instance.update_rtif(put_rtif_payload, session)
+    log.debug("RenderedTaskInstanceFields updated successfully")
 
     return {"message": "Rendered task instance fields successfully set"}
 
@@ -569,8 +609,12 @@ def get_previous_successful_dagrun(
     The data from this endpoint is used to get values for Task Context.
     """
     ti_id_str = str(task_instance_id)
+    bind_contextvars(ti_id=ti_id_str)
+    log.debug("Retrieving previous successful DAG run")
+
     task_instance = session.scalar(select(TI).where(TI.id == ti_id_str))
     if not task_instance or not task_instance.logical_date:
+        log.debug("No task instance or logical date found")
         return PrevSuccessfulDagRunResponse()
 
     dag_run = session.scalar(
@@ -584,8 +628,15 @@ def get_previous_successful_dagrun(
         .limit(1)
     )
     if not dag_run:
+        log.debug("No previous successful DAG run found")
         return PrevSuccessfulDagRunResponse()
 
+    log.debug(
+        "Found previous successful DAG run",
+        dag_id=dag_run.dag_id,
+        run_id=dag_run.run_id,
+        logical_date=dag_run.logical_date,
+    )
     return PrevSuccessfulDagRunResponse.model_validate(dag_run)
 
 

Reply via email to