This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5749e12c49f Support TaskInstanceHistory in log handlers (#51592)
5749e12c49f is described below

commit 5749e12c49fb985de9f5fac638db266c0e13f7d2
Author: Ankit Chaurasia <[email protected]>
AuthorDate: Fri Jun 13 10:31:51 2025 +0545

    Support TaskInstanceHistory in log handlers (#51592)
    
    Support TaskInstanceHistory in log handlers
---
 .../api_fastapi/core_api/routes/public/log.py      |  1 +
 .../src/airflow/models/taskinstancehistory.py      | 11 +++++
 .../src/airflow/utils/log/file_task_handler.py     | 54 +++++++---------------
 airflow-core/src/airflow/utils/log/log_reader.py   | 15 ++++--
 airflow-core/tests/unit/utils/test_log_handlers.py | 33 +++++++++++++
 5 files changed, 74 insertions(+), 40 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
index 282e7b8f945..01cf859f05e 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
@@ -112,6 +112,7 @@ def get_log(
             TaskInstance.dag_id == dag_id,
             TaskInstance.run_id == dag_run_id,
             TaskInstance.map_index == map_index,
+            TaskInstance.try_number == try_number,
         )
         .join(TaskInstance.dag_run)
         
.options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job))
diff --git a/airflow-core/src/airflow/models/taskinstancehistory.py 
b/airflow-core/src/airflow/models/taskinstancehistory.py
index 7fdd55178e5..cbd83643fdb 100644
--- a/airflow-core/src/airflow/models/taskinstancehistory.py
+++ b/airflow-core/src/airflow/models/taskinstancehistory.py
@@ -52,6 +52,7 @@ from airflow.utils.state import State, TaskInstanceState
 if TYPE_CHECKING:
     from sqlalchemy.orm.session import Session
 
+    from airflow.models import DagRun
     from airflow.models.taskinstance import TaskInstance
 
 
@@ -161,6 +162,11 @@ class TaskInstanceHistory(Base):
         Index("idx_tih_dag_run", dag_id, run_id),
     )
 
+    @property
+    def id(self) -> str:
+        """Alias for primary key field to support TaskInstance."""
+        return self.task_instance_id
+
     @staticmethod
     @provide_session
     def record_ti(ti: TaskInstance, session: Session = NEW_SESSION) -> None:
@@ -183,3 +189,8 @@ class TaskInstanceHistory(Base):
             ti.set_duration()
         ti_history = TaskInstanceHistory(ti, state=ti_history_state)
         session.add(ti_history)
+
+    @provide_session
+    def get_dagrun(self, session: Session = NEW_SESSION) -> DagRun:
+        """Return the DagRun for this TaskInstanceHistory, matching 
TaskInstance."""
+        return self.dag_run
diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py 
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index c39792baa81..15339c00cbb 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -34,7 +34,6 @@ import pendulum
 from pydantic import BaseModel, ConfigDict, ValidationError
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.helpers import parse_template_string, render_template
 from airflow.utils.log.logging_mixin import SetContextPropagate
@@ -45,7 +44,7 @@ from airflow.utils.state import State, TaskInstanceState
 if TYPE_CHECKING:
     from airflow.executors.base_executor import BaseExecutor
     from airflow.models.taskinstance import TaskInstance
-    from airflow.models.taskinstancekey import TaskInstanceKey
+    from airflow.models.taskinstancehistory import TaskInstanceHistory
     from airflow.typing_compat import TypeAlias
 
 
@@ -180,32 +179,6 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
         last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
-    """
-    Given TI | TIKey, return a TI object.
-
-    Will raise exception if no TI is found in the database.
-    """
-    from airflow.models.taskinstance import TaskInstance
-
-    if isinstance(ti, TaskInstance):
-        return ti
-    val = (
-        session.query(TaskInstance)
-        .filter(
-            TaskInstance.task_id == ti.task_id,
-            TaskInstance.dag_id == ti.dag_id,
-            TaskInstance.run_id == ti.run_id,
-            TaskInstance.map_index == ti.map_index,
-        )
-        .one_or_none()
-    )
-    if not val:
-        raise AirflowException(f"Could not find TaskInstance for {ti}")
-    val.try_number = ti.try_number
-    return val
-
-
 class FileTaskHandler(logging.Handler):
     """
     FileTaskHandler is a python log handler that handles and reads task 
instance logs.
@@ -253,7 +226,9 @@ class FileTaskHandler(logging.Handler):
         Some handlers emit "end of log" markers, and may not wish to do so 
when task defers.
         """
 
-    def set_context(self, ti: TaskInstance, *, identifier: str | None = None) 
-> None | SetContextPropagate:
+    def set_context(
+        self, ti: TaskInstance | TaskInstanceHistory, *, identifier: str | 
None = None
+    ) -> None | SetContextPropagate:
         """
         Provide task_instance context to airflow task handler.
 
@@ -309,9 +284,10 @@ class FileTaskHandler(logging.Handler):
             self.handler.close()
 
     @provide_session
-    def _render_filename(self, ti: TaskInstance, try_number: int, 
session=NEW_SESSION) -> str:
+    def _render_filename(
+        self, ti: TaskInstance | TaskInstanceHistory, try_number: int, 
session=NEW_SESSION
+    ) -> str:
         """Return the worker log filename."""
-        ti = _ensure_ti(ti, session)
         dag_run = ti.get_dagrun(session=session)
 
         date = dag_run.logical_date or dag_run.run_after
@@ -344,8 +320,8 @@ class FileTaskHandler(logging.Handler):
         raise RuntimeError(f"Unable to render log filename for {ti}. This 
should never happen")
 
     def _get_executor_get_task_log(
-        self, ti: TaskInstance
-    ) -> Callable[[TaskInstance, int], tuple[list[str], list[str]]]:
+        self, ti: TaskInstance | TaskInstanceHistory
+    ) -> Callable[[TaskInstance | TaskInstanceHistory, int], tuple[list[str], 
list[str]]]:
         """
         Get the get_task_log method from executor of current task instance.
 
@@ -367,7 +343,7 @@ class FileTaskHandler(logging.Handler):
 
     def _read(
         self,
-        ti: TaskInstance,
+        ti: TaskInstance | TaskInstanceHistory,
         try_number: int,
         metadata: dict[str, Any] | None = None,
     ):
@@ -455,7 +431,8 @@ class FileTaskHandler(logging.Handler):
         return logs, {"end_of_log": end_of_log, "log_pos": log_pos}
 
     @staticmethod
-    def _get_pod_namespace(ti: TaskInstance):
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance | TaskInstanceHistory):
         pod_override = ti.executor_config.get("pod_override")
         namespace = None
         with suppress(Exception):
@@ -463,7 +440,10 @@ class FileTaskHandler(logging.Handler):
         return namespace or conf.get("kubernetes_executor", "namespace")
 
     def _get_log_retrieval_url(
-        self, ti: TaskInstance, log_relative_path: str, log_type: LogType | 
None = None
+        self,
+        ti: TaskInstance | TaskInstanceHistory,
+        log_relative_path: str,
+        log_type: LogType | None = None,
     ) -> tuple[str, str]:
         """Given TI, generate URL with which to fetch logs from service log 
server."""
         if log_type == LogType.TRIGGER:
@@ -487,7 +467,7 @@ class FileTaskHandler(logging.Handler):
 
     def read(
         self,
-        task_instance: TaskInstance,
+        task_instance: TaskInstance | TaskInstanceHistory,
         try_number: int | None = None,
         metadata: dict[str, Any] | None = None,
     ) -> tuple[list[StructuredLogMessage] | str, dict[str, Any]]:
diff --git a/airflow-core/src/airflow/utils/log/log_reader.py 
b/airflow-core/src/airflow/utils/log/log_reader.py
index c00a6877511..4a36b3ef1b0 100644
--- a/airflow-core/src/airflow/utils/log/log_reader.py
+++ b/airflow-core/src/airflow/utils/log/log_reader.py
@@ -33,6 +33,7 @@ if TYPE_CHECKING:
     from sqlalchemy.orm.session import Session
 
     from airflow.models.taskinstance import TaskInstance
+    from airflow.models.taskinstancehistory import TaskInstanceHistory
     from airflow.typing_compat import TypeAlias
 
 LogMessages: TypeAlias = Union[list[StructuredLogMessage], str]
@@ -48,7 +49,10 @@ class TaskLogReader:
     """Number of empty loop iterations before stopping the stream"""
 
     def read_log_chunks(
-        self, ti: TaskInstance, try_number: int | None, metadata
+        self,
+        ti: TaskInstance | TaskInstanceHistory,
+        try_number: int | None,
+        metadata: LogMetadata,
     ) -> tuple[LogMessages, LogMetadata]:
         """
         Read chunks of Task Instance logs.
@@ -70,7 +74,12 @@ class TaskLogReader:
         """
         return self.log_handler.read(ti, try_number, metadata=metadata)
 
-    def read_log_stream(self, ti: TaskInstance, try_number: int | None, 
metadata: dict) -> Iterator[str]:
+    def read_log_stream(
+        self,
+        ti: TaskInstance | TaskInstanceHistory,
+        try_number: int | None,
+        metadata: LogMetadata,
+    ) -> Iterator[str]:
         """
         Continuously read log to the end.
 
@@ -147,7 +156,7 @@ class TaskLogReader:
     @provide_session
     def render_log_filename(
         self,
-        ti: TaskInstance,
+        ti: TaskInstance | TaskInstanceHistory,
         try_number: int | None = None,
         *,
         session: Session = NEW_SESSION,
diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py 
b/airflow-core/tests/unit/utils/test_log_handlers.py
index 6d061375739..5813c22819e 100644
--- a/airflow-core/tests/unit/utils/test_log_handlers.py
+++ b/airflow-core/tests/unit/utils/test_log_handlers.py
@@ -42,6 +42,7 @@ from airflow.jobs.job import Job
 from airflow.jobs.triggerer_job_runner import TriggererJobRunner
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 from airflow.models.trigger import Trigger
 from airflow.providers.standard.operators.python import PythonOperator
 from airflow.utils.log.file_task_handler import (
@@ -606,6 +607,38 @@ class TestFilenameRendering:
         rendered_filename = fth._render_filename(filename_rendering_ti, 42)
         assert expected_filename == rendered_filename
 
+    def test_jinja_id_in_template_for_history(
+        self, create_log_template, create_task_instance, logical_date, session
+    ):
+        """Test that Jinja template using ti.id works for both TaskInstance 
and TaskInstanceHistory"""
+        create_log_template("{{ ti.id }}.log")
+        ti = create_task_instance(
+            dag_id="dag_history_test",
+            task_id="history_task",
+            run_type=DagRunType.SCHEDULED,
+            logical_date=DEFAULT_DATE,
+            catchup=True,
+        )
+        TaskInstanceHistory.record_ti(ti, session=session)
+        session.flush()
+        tih = (
+            session.query(TaskInstanceHistory)
+            .filter_by(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                run_id=ti.run_id,
+                map_index=ti.map_index,
+                try_number=ti.try_number,
+            )
+            .one()
+        )
+        fth = FileTaskHandler("")
+        rendered_ti = fth._render_filename(ti, ti.try_number, session=session)
+        rendered_tih = fth._render_filename(tih, ti.try_number, 
session=session)
+        expected = f"{ti.id}.log"
+        assert rendered_ti == expected
+        assert rendered_tih == expected
+
 
 class TestLogUrl:
     def test_log_retrieval_valid(self, create_task_instance):

Reply via email to