seanghaeli commented on code in PR #66610:
URL: https://github.com/apache/airflow/pull/66610#discussion_r3450025081
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py:
##########
@@ -201,3 +206,80 @@ def get_dag_deadline_alerts(
alerts = session.scalars(alerts_select)
return DeadlineAlertCollectionResponse(deadline_alerts=alerts,
total_entries=total_entries)
+
+
+@deadlines_router.get(
+ "/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+ dependencies=[
+ Depends(
+ requires_access_dag(
+ method="GET",
+ access_entity=DagAccessEntity.TASK_LOGS,
+ )
+ ),
+ ],
+ response_model=TaskInstancesLogResponse,
+ response_model_exclude_unset=True,
+)
+def get_callback_logs(
+ dag_id: str,
+ dag_run_id: str,
+ callback_id: UUID,
+ session: SessionDep,
+) -> TaskInstancesLogResponse:
+ """
+ Get execution logs for a callback associated with a deadline.
+
+ Returns the logs produced during callback execution. These logs are
uploaded
+ to remote storage (or written locally) by the callback supervisor after
execution.
+ """
+ # Sanitize path components to prevent path traversal via URL parameters.
+ for param_name, param_value in [("dag_id", dag_id), ("dag_run_id",
dag_run_id)]:
+ if os.sep in param_value or "\\" in param_value or ".." in param_value:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ f"Invalid characters in {param_name}",
+ )
+
+ # Verify the callback exists
+ callback = session.scalar(select(Callback).where(Callback.id ==
callback_id))
+ if callback is None:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"Callback with id `{callback_id}` was not found",
+ )
+
+ # Verify the dag_run exists with matching dag_id
+ dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id,
DagRun.run_id == dag_run_id))
+ if dag_run is None:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was
not found",
+ )
+
+ # Verify the callback actually belongs to this dag_run (via the Deadline
relationship)
+ deadline = session.scalar(
+ select(Deadline).where(Deadline.callback_id == callback_id,
Deadline.dagrun_id == dag_run.id)
+ )
+ if deadline is None:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"Callback `{callback_id}` is not associated with DagRun
`{dag_run_id}` of Dag `{dag_id}`",
+ )
+
+ try:
+ log_stream = read_callback_log(
+ dag_id=dag_id,
+ run_id=dag_run_id,
+ callback_id=str(callback_id),
+ )
+ except ValueError:
+ raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid callback log
path")
+
+ content = list(log_stream)
+ return TaskInstancesLogResponse.model_construct(content=content,
continuation_token=None)
Review Comment:
Good callout, I'd rather include this fix in a separate PR since 1) it's an
edge case that usually doesn't occur 2) would require some thought about how to
handle the UI consumer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]