utkarsharma2 commented on code in PR #44238:
URL: https://github.com/apache/airflow/pull/44238#discussion_r1851919813


##########
airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details(
         map_index=map_index,
         session=session,
     )
+
+
+@task_instances_router.get(
+    "/{task_id}/logs/{task_try_number}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    response_model=None,
+)
+def get_log(
+    *,
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    task_try_number: int,
+    request: Request,
+    session: Annotated[Session, Depends(get_session)],
+    full_content: bool = False,
+    map_index: int = -1,
+    token: str | None = None,
+) -> Response | dict:
+    """Get logs for specific task instance."""
+    if not token:
+        metadata = {}
+    else:
+        try:
+            metadata = 
URLSafeSerializer(request.app.state.secret_key).loads(token)
+        except BadSignature:
+            raise HTTPException(
+                status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only 
the tokens provided by the API."
+            )
+
+    if metadata.get("download_logs") and metadata["download_logs"]:
+        full_content = True
+
+    if full_content:
+        metadata["download_logs"] = True
+    else:
+        metadata["download_logs"] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.supports_read:
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler 
does not support read logs.")
+
+    query = (
+        select(TaskInstance)
+        .where(
+            TaskInstance.task_id == task_id,
+            TaskInstance.dag_id == dag_id,
+            TaskInstance.run_id == dag_run_id,
+            TaskInstance.map_index == map_index,
+        )
+        .join(TaskInstance.dag_run)
+        
.options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job))
+    )
+    ti = session.scalar(query)
+    if ti is None:
+        query = select(TaskInstanceHistory).where(
+            TaskInstanceHistory.task_id == task_id,
+            TaskInstanceHistory.dag_id == dag_id,
+            TaskInstanceHistory.run_id == dag_run_id,
+            TaskInstanceHistory.map_index == map_index,
+            TaskInstanceHistory.try_number == task_try_number,
+        )
+        ti = session.scalar(query)
+
+    if ti is None:
+        metadata["end_of_log"] = True
+        raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not 
found")
+
+    dag = request.app.state.dag_bag.get_dag(dag_id)
+    if dag:
+        try:
+            ti.task = dag.get_task(ti.task_id)
+        except TaskNotFound:
+            pass
+
+    return_type = request.headers["accept"]
+    # return_type would be either the above two or None
+    logs: Any
+    if return_type == "application/json" or return_type is None:  # default
+        logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, 
metadata)
+        logs = logs[0] if task_try_number is not None else logs
+        # we must have token here, so we can safely ignore it
+        token = 
URLSafeSerializer(request.app.state.secret_key).dumps(metadata)  # type: 
ignore[assignment]
+        return TaskInstancesLogResponseObject(continuation_token=token, 
content=str(logs)).model_dump()
+    # text/plain. Stream
+    logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
+    return Response(media_type="text/plain", content="".join(list(logs)))

Review Comment:
   Used `accept: HeaderAcceptJsonOrText` as per suggestions, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to