This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new d396ed9d364 [v3-0-test] Execution API: Improve task instance logging
with structlog context (#50120) (#50131)
d396ed9d364 is described below
commit d396ed9d36432bc9deff2687977000e0c8e35652
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat May 3 00:16:51 2025 +0530
[v3-0-test] Execution API: Improve task instance logging with structlog
context (#50120) (#50131)
- Added `ti_id` to all log contexts using structlog's `bind_contextvars`
- Remove redundant ti_id from individual log messages
- Add more detailed debug logs for important operations
(cherry picked from commit c9b2dca1c5150c8e767470699d9c1b751817ddd9)
Co-authored-by: Kaxil Naik <[email protected]>
---
.../execution_api/routes/task_instances.py | 91 +++++++++++++++++-----
1 file changed, 71 insertions(+), 20 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 9b0e2dc7a27..f97a684e3ff 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -18,11 +18,11 @@
from __future__ import annotations
import json
-import logging
from collections import defaultdict
from typing import Annotated, Any
from uuid import UUID
+import structlog
from cadwyn import VersionedAPIRouter
from fastapi import Body, Depends, HTTPException, Query, Request, status
from pydantic import JsonValue
@@ -30,6 +30,7 @@ from sqlalchemy import func, or_, tuple_, update
from sqlalchemy.exc import NoResultFound, SQLAlchemyError
from sqlalchemy.orm import joinedload
from sqlalchemy.sql import select
+from structlog.contextvars import bind_contextvars
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.types import UtcDateTime
@@ -67,7 +68,7 @@ ti_id_router = VersionedAPIRouter(
)
-log = logging.getLogger(__name__)
+log = structlog.get_logger(__name__)
@ti_id_router.patch(
@@ -90,6 +91,13 @@ def ti_run(
"""
# We only use UUID above for validation purposes
ti_id_str = str(task_instance_id)
+ bind_contextvars(ti_id=ti_id_str)
+ log.debug(
+ "Starting task instance run",
+ hostname=ti_run_payload.hostname,
+ unixname=ti_run_payload.unixname,
+ pid=ti_run_payload.pid,
+ )
from sqlalchemy.sql import column
from sqlalchemy.types import JSON
@@ -118,8 +126,9 @@ def ti_run(
)
try:
ti = session.execute(old).one()
+ log.debug("Retrieved task instance details", state=ti.state,
dag_id=ti.dag_id, task_id=ti.task_id)
except NoResultFound:
- log.error("Task Instance %s not found", ti_id_str)
+ log.error("Task Instance not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
@@ -134,6 +143,7 @@ def ti_run(
# don't update start date when resuming from deferral
if ti.next_kwargs:
data.pop("start_date")
+ log.debug("Removed start_date from update as task is resuming from
deferral")
query = update(TI).where(TI.id == ti_id_str).values(data)
@@ -146,12 +156,11 @@ def ti_run(
ti_run_payload.unixname,
ti_run_payload.pid,
):
- log.info("Duplicate start request received from %s ",
ti_run_payload.hostname)
+ log.info("Duplicate start request received",
hostname=ti_run_payload.hostname)
elif previous_state not in (TaskInstanceState.QUEUED,
TaskInstanceState.RESTARTING):
log.warning(
- "Can not start Task Instance ('%s') in invalid state: %s",
- ti_id_str,
- previous_state,
+ "Cannot start Task Instance in invalid state",
+ previous_state=previous_state,
)
# TODO: Pass a RFC 9457 compliant error message in "detail" field
@@ -168,7 +177,7 @@ def ti_run(
},
)
else:
- log.info("Task with %s state started on %s ", previous_state,
ti_run_payload.hostname)
+ log.info("Task started", previous_state=previous_state,
hostname=ti_run_payload.hostname)
# Ensure there is no end date set.
query = query.values(
end_date=None,
@@ -181,7 +190,7 @@ def ti_run(
try:
result = session.execute(query)
- log.info("TI %s state updated: %s row(s) affected", ti_id_str,
result.rowcount)
+ log.info("Task instance state updated", rows_affected=result.rowcount)
dr = (
session.scalars(
@@ -194,6 +203,7 @@ def ti_run(
)
if not dr:
+ log.error("DagRun not found", dag_id=ti.dag_id, run_id=ti.run_id)
raise ValueError(f"DagRun with dag_id={ti.dag_id} and
run_id={ti.run_id} not found.")
# Send the keys to the SDK so that the client requests to clear those
XComs from the server.
@@ -268,10 +278,12 @@ def ti_update_state(
Not all state transitions are valid, and transitioning to some states
requires extra information to be
passed along. (Check out the datamodels for details, the rendered docs
might not reflect this accurately)
"""
- updated_state: str = ""
-
# We only use UUID above for validation purposes
ti_id_str = str(task_instance_id)
+ bind_contextvars(ti_id=ti_id_str)
+ log.debug("Updating task instance state", new_state=ti_patch_payload.state)
+
+ updated_state: str = ""
old = select(TI.state, TI.try_number, TI.max_tries, TI.dag_id).where(TI.id
== ti_id_str).with_for_update()
try:
@@ -281,8 +293,14 @@ def ti_update_state(
max_tries,
dag_id,
) = session.execute(old).one()
+ log.debug(
+ "Retrieved current task instance state",
+ previous_state=previous_state,
+ try_number=try_number,
+ max_tries=max_tries,
+ )
except NoResultFound:
- log.error("Task Instance %s not found", ti_id_str)
+ log.error("Task Instance not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
@@ -293,9 +311,8 @@ def ti_update_state(
if previous_state != TaskInstanceState.RUNNING:
log.warning(
- "Cannot update Task Instance ('%s') because it is in an invalid
state: %s for an update",
- ti_id_str,
- previous_state,
+ "Cannot update Task Instance in invalid state",
+ previous_state=previous_state,
)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
@@ -421,9 +438,9 @@ def ti_update_state(
#
https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers
try:
result = session.execute(query)
- log.info("TI %s state updated to %s: %s row(s) affected", ti_id_str,
updated_state, result.rowcount)
+ log.info("Task instance state updated", new_state=updated_state,
rows_affected=result.rowcount)
except SQLAlchemyError as e:
- log.error("Error updating Task Instance state: %s", e)
+ log.error("Error updating Task Instance state", error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Database error occurred"
)
@@ -443,12 +460,17 @@ def ti_skip_downstream(
session: SessionDep,
):
ti_id_str = str(task_instance_id)
+ bind_contextvars(ti_id=ti_id_str)
+ log.info("Skipping downstream tasks",
task_count=len(ti_patch_payload.tasks))
+
now = timezone.utcnow()
tasks = ti_patch_payload.tasks
dag_id, run_id = session.execute(select(TI.dag_id, TI.run_id).where(TI.id
== ti_id_str)).fetchone()
+ log.debug("Retrieved DAG and run info", dag_id=dag_id, run_id=run_id)
task_ids = [task if isinstance(task, tuple) else (task, -1) for task in
tasks]
+ log.debug("Prepared task IDs for skipping", task_ids=task_ids)
query = (
update(TI)
@@ -458,7 +480,7 @@ def ti_skip_downstream(
)
result = session.execute(query)
- log.info("TI %s updated the state of %s task(s) to skipped", ti_id_str,
result.rowcount)
+ log.info("Downstream tasks skipped", tasks_skipped=result.rowcount)
@ti_id_router.put(
@@ -479,6 +501,8 @@ def ti_heartbeat(
):
"""Update the heartbeat of a TaskInstance to mark it as alive & still
running."""
ti_id_str = str(task_instance_id)
+ bind_contextvars(ti_id=ti_id_str)
+ log.debug("Processing heartbeat", hostname=ti_payload.hostname,
pid=ti_payload.pid)
# Hot path: since heartbeating a task is a very common operation, we try
to do minimize the number of queries
# and DB round trips as much as possible.
@@ -487,8 +511,11 @@ def ti_heartbeat(
try:
(previous_state, hostname, pid) = session.execute(old).one()
+ log.debug(
+ "Retrieved current task state", state=previous_state,
current_hostname=hostname, current_pid=pid
+ )
except NoResultFound:
- log.error("Task Instance %s not found", ti_id_str)
+ log.error("Task Instance not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
@@ -498,6 +525,13 @@ def ti_heartbeat(
)
if hostname != ti_payload.hostname or pid != ti_payload.pid:
+ log.warning(
+ "Task running elsewhere",
+ current_hostname=hostname,
+ current_pid=pid,
+ requested_hostname=ti_payload.hostname,
+ requested_pid=ti_payload.pid,
+ )
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
@@ -509,6 +543,7 @@ def ti_heartbeat(
)
if previous_state != TaskInstanceState.RUNNING:
+ log.warning("Task not in running state", current_state=previous_state)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
@@ -520,7 +555,7 @@ def ti_heartbeat(
# Update the last heartbeat time!
session.execute(update(TI).where(TI.id ==
ti_id_str).values(last_heartbeat_at=timezone.utcnow()))
- log.debug("Task with %s state heartbeated", previous_state)
+ log.debug("Heartbeat updated", state=previous_state)
@ti_id_router.put(
@@ -543,12 +578,17 @@ def ti_put_rtif(
):
"""Add an RTIF entry for a task instance, sent by the worker."""
ti_id_str = str(task_instance_id)
+ bind_contextvars(ti_id=ti_id_str)
+ log.info("Updating RenderedTaskInstanceFields",
field_count=len(put_rtif_payload))
+
task_instance = session.scalar(select(TI).where(TI.id == ti_id_str))
if not task_instance:
+ log.error("Task Instance not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
)
task_instance.update_rtif(put_rtif_payload, session)
+ log.debug("RenderedTaskInstanceFields updated successfully")
return {"message": "Rendered task instance fields successfully set"}
@@ -569,8 +609,12 @@ def get_previous_successful_dagrun(
The data from this endpoint is used to get values for Task Context.
"""
ti_id_str = str(task_instance_id)
+ bind_contextvars(ti_id=ti_id_str)
+ log.debug("Retrieving previous successful DAG run")
+
task_instance = session.scalar(select(TI).where(TI.id == ti_id_str))
if not task_instance or not task_instance.logical_date:
+ log.debug("No task instance or logical date found")
return PrevSuccessfulDagRunResponse()
dag_run = session.scalar(
@@ -584,8 +628,15 @@ def get_previous_successful_dagrun(
.limit(1)
)
if not dag_run:
+ log.debug("No previous successful DAG run found")
return PrevSuccessfulDagRunResponse()
+ log.debug(
+ "Found previous successful DAG run",
+ dag_id=dag_run.dag_id,
+ run_id=dag_run.run_id,
+ logical_date=dag_run.logical_date,
+ )
return PrevSuccessfulDagRunResponse.model_validate(dag_run)