This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b069f1fdbf8 Fix error when retrieving logs of ti not run because of 
upstream failures (#55517)
b069f1fdbf8 is described below

commit b069f1fdbf833488a8da885d62734258fc3a73eb
Author: Sebastián Ortega <[email protected]>
AuthorDate: Tue Sep 16 11:03:07 2025 +0200

    Fix error when retrieving logs of ti not run because of upstream failures 
(#55517)
    
    * Fix TestFileTaskLogHandler tests
    
    * Return placeholder message when requested the logs of tasks skipped 
because of upstream failures
---
 .../src/airflow/utils/log/file_task_handler.py     |  5 ++++-
 airflow-core/tests/unit/utils/test_log_handlers.py | 25 +++++++++++-----------
 2 files changed, 17 insertions(+), 13 deletions(-)

diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py 
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index 3669aae122b..9954688a41f 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -741,7 +741,10 @@ class FileTaskHandler(logging.Handler):
         if try_number is None:
             try_number = task_instance.try_number
 
-        if try_number == 0 and task_instance.state == 
TaskInstanceState.SKIPPED:
+        if try_number == 0 and task_instance.state in (
+            TaskInstanceState.SKIPPED,
+            TaskInstanceState.UPSTREAM_FAILED,
+        ):
             logs = [StructuredLogMessage(event="Task was skipped, no logs 
available.")]
             return chain(logs), {"end_of_log": True}
 
diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py 
b/airflow-core/tests/unit/utils/test_log_handlers.py
index b5d939d52e7..6d2caaa8a43 100644
--- a/airflow-core/tests/unit/utils/test_log_handlers.py
+++ b/airflow-core/tests/unit/utils/test_log_handlers.py
@@ -79,7 +79,7 @@ from tests_common.test_utils.file_task_handler import (
 )
 from tests_common.test_utils.markers import 
skip_if_force_lowest_dependencies_marker
 
-pytestmark = [pytest.mark.db_test, pytest.mark.xfail()]
+pytestmark = [pytest.mark.db_test]
 
 DEFAULT_DATE = pendulum.datetime(2016, 1, 1)
 TASK_LOGGER = "airflow.task"
@@ -156,7 +156,8 @@ class TestFileTaskLogHandler:
         # Remove the generated tmp log file.
         os.remove(log_filename)
 
-    def test_file_task_handler_when_ti_is_skipped(self, dag_maker):
+    @pytest.mark.parametrize("ti_state", [TaskInstanceState.SKIPPED, 
TaskInstanceState.UPSTREAM_FAILED])
+    def test_file_task_handler_when_ti_is_not_run(self, dag_maker, ti_state):
         def task_callable(ti):
             ti.log.info("test")
 
@@ -170,10 +171,10 @@ class TestFileTaskLogHandler:
         ti = TaskInstance(task=task, run_id=dagrun.run_id, 
dag_version_id=dag_version.id)
 
         ti.try_number = 0
-        ti.state = State.SKIPPED
+        ti.state = ti_state
 
-        logger = ti.log
-        ti.log.disabled = False
+        logger = logging.getLogger(TASK_LOGGER)
+        logger.disabled = False
 
         file_handler = next(
             (handler for handler in logger.handlers if handler.name == 
FILE_TASK_HANDLER), None
@@ -295,8 +296,8 @@ class TestFileTaskLogHandler:
                 ti.executor = executor_name
             ti.try_number = 1
             ti.state = TaskInstanceState.RUNNING
-            logger = ti.log
-            ti.log.disabled = False
+            logger = logging.getLogger(TASK_LOGGER)
+            logger.disabled = False
 
             file_handler = next(
                 (handler for handler in logger.handlers if handler.name == 
FILE_TASK_HANDLER), None
@@ -344,8 +345,8 @@ class TestFileTaskLogHandler:
         ti.try_number = 2
         ti.state = State.RUNNING
 
-        logger = ti.log
-        ti.log.disabled = False
+        logger = logging.getLogger(TASK_LOGGER)
+        logger.disabled = False
 
         file_handler = next(
             (handler for handler in logger.handlers if handler.name == 
FILE_TASK_HANDLER), None
@@ -396,8 +397,8 @@ class TestFileTaskLogHandler:
         ti.try_number = 1
         ti.state = State.RUNNING
 
-        logger = ti.log
-        ti.log.disabled = False
+        logger = logging.getLogger(TASK_LOGGER)
+        logger.disabled = False
 
         file_handler = next(
             (handler for handler in logger.handlers if handler.name == 
FILE_TASK_HANDLER), None
@@ -413,7 +414,7 @@ class TestFileTaskLogHandler:
         assert log_filename.endswith("1.log"), log_filename
 
         # mock to generate 2000 lines of log, the total size is larger than 
max_bytes_size
-        for i in range(1, 2000):
+        for i in range(1, 3000):
             logger.info("this is a Test. %s", i)
 
         # this is the rotate log file

Reply via email to