This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0d6e4172e6e Fix log for skipped taks (#53024)
0d6e4172e6e is described below
commit 0d6e4172e6e4eccabd5c8b12606c10cfd23ee58b
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Jul 8 15:04:27 2025 +0200
Fix log for skipped taks (#53024)
---
.../core_api/openapi/v2-rest-api-generated.yaml | 2 +-
.../api_fastapi/core_api/routes/public/log.py | 4 +--
.../ui/src/pages/TaskInstance/Logs/Logs.tsx | 2 +-
.../src/airflow/utils/log/file_task_handler.py | 9 ++++++
airflow-core/tests/unit/utils/test_log_handlers.py | 37 ++++++++++++++++++++++
5 files changed, 50 insertions(+), 4 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 6dc989b00d5..37e80e9a28a 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -6608,7 +6608,7 @@ paths:
required: true
schema:
type: integer
- exclusiveMinimum: 0
+ minimum: 0
title: Try Number
- name: full_content
in: query
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
index 01cf859f05e..277e705c18d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py
@@ -22,7 +22,7 @@ import textwrap
from fastapi import Depends, HTTPException, Request, Response, status
from itsdangerous import BadSignature, URLSafeSerializer
-from pydantic import PositiveInt
+from pydantic import NonNegativeInt, PositiveInt
from sqlalchemy.orm import joinedload
from sqlalchemy.sql import select
@@ -75,7 +75,7 @@ def get_log(
dag_id: str,
dag_run_id: str,
task_id: str,
- try_number: PositiveInt,
+ try_number: NonNegativeInt,
accept: HeaderAcceptJsonOrNdjson,
request: Request,
dag_bag: DagBagDep,
diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx
b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx
index c6259979950..a96f0c969c9 100644
--- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx
@@ -91,7 +91,7 @@ export const Logs = () => {
logLevelFilters,
sourceFilters,
taskInstance,
- tryNumber: tryNumber === 0 ? 1 : tryNumber,
+ tryNumber,
});
const externalLogName = useConfig("external_log_name") as string;
diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index 84987d1a201..9b086457f60 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -482,6 +482,15 @@ class FileTaskHandler(logging.Handler):
"""
if try_number is None:
try_number = task_instance.try_number
+
+ if task_instance.state == TaskInstanceState.SKIPPED:
+ logs = [
+ StructuredLogMessage( # type: ignore[call-arg]
+ event="Task was skipped, no logs available."
+ )
+ ]
+ return logs, {"end_of_log": True}
+
if try_number is None or try_number < 1:
logs = [
StructuredLogMessage( # type: ignore[call-arg]
diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py
b/airflow-core/tests/unit/utils/test_log_handlers.py
index 5813c22819e..286eae5c842 100644
--- a/airflow-core/tests/unit/utils/test_log_handlers.py
+++ b/airflow-core/tests/unit/utils/test_log_handlers.py
@@ -151,6 +151,43 @@ class TestFileTaskLogHandler:
# Remove the generated tmp log file.
os.remove(log_filename)
+ def test_file_task_handler_when_ti_is_skipped(self, dag_maker):
+ def task_callable(ti):
+ ti.log.info("test")
+
+ with dag_maker("dag_for_testing_file_task_handler", schedule=None):
+ task = PythonOperator(
+ task_id="task_for_testing_file_log_handler",
+ python_callable=task_callable,
+ )
+ dagrun = dag_maker.create_dagrun()
+ ti = TaskInstance(task=task, run_id=dagrun.run_id)
+
+ ti.try_number = 0
+ ti.state = State.SKIPPED
+
+ logger = ti.log
+ ti.log.disabled = False
+
+ file_handler = next(
+ (handler for handler in logger.handlers if handler.name ==
FILE_TASK_HANDLER), None
+ )
+ assert file_handler is not None
+
+ set_context(logger, ti)
+ assert file_handler.handler is not None
+ # We expect set_context generates a file locally.
+ log_filename = file_handler.handler.baseFilename
+ assert os.path.isfile(log_filename)
+ assert log_filename.endswith("0.log"), log_filename
+
+ # Return value of read must be a tuple of list and list.
+ logs, metadata = file_handler.read(ti)
+ assert logs[0].event == "Task was skipped, no logs available."
+
+ # Remove the generated tmp log file.
+ os.remove(log_filename)
+
@pytest.mark.xfail(reason="TODO: Needs to be ported over to the new
structlog based logging")
def test_file_task_handler(self, dag_maker, session):
def task_callable(ti):