This is an automated email from the ASF dual-hosted git repository.
gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 00dc4203e77 Fix duplicate log reads when resuming from log_pos (#63531)
00dc4203e77 is described below
commit 00dc4203e779ba889e3d6aa400967b23195b4280
Author: GPK <[email protected]>
AuthorDate: Fri Mar 13 23:02:36 2026 +0000
Fix duplicate log reads when resuming from log_pos (#63531)
---
.../src/airflow/utils/log/file_task_handler.py | 2 +-
airflow-core/tests/unit/utils/test_log_handlers.py | 29 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index 02545599271..1481d0a315e 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -684,7 +684,7 @@ class FileTaskHandler(logging.Handler):
# skip log stream until the last position
if metadata and "log_pos" in metadata:
- islice(out_stream, metadata["log_pos"])
+ out_stream = islice(out_stream, metadata["log_pos"], None)
else:
# first time reading log, add messages before interleaved log
stream
out_stream = chain(header, out_stream)
diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py
b/airflow-core/tests/unit/utils/test_log_handlers.py
index 4458429a01f..6a93e940e72 100644
--- a/airflow-core/tests/unit/utils/test_log_handlers.py
+++ b/airflow-core/tests/unit/utils/test_log_handlers.py
@@ -475,6 +475,35 @@ class TestFileTaskLogHandler:
assert extract_events(log_handler_output_stream) == ["the log"]
assert metadata == {"end_of_log": True, "log_pos": 1}
+
@patch("airflow.utils.log.file_task_handler.FileTaskHandler._read_from_local")
+ def test__read_when_local_respects_log_pos_metadata(self, mock_read_local,
create_task_instance):
+ path = Path(
+
"dag_id=dag_for_testing_local_log_read/run_id=scheduled__2016-01-01T00:00:00+00:00/task_id=task_for_testing_local_log_read/attempt=1.log"
+ )
+ mock_read_local.return_value = (
+ ["the messages"],
+ [convert_list_to_stream(["line 1", "line 2", "line 3"])],
+ )
+ local_log_file_read = create_task_instance(
+ dag_id="dag_for_testing_local_log_read",
+ task_id="task_for_testing_local_log_read",
+ run_type=DagRunType.SCHEDULED,
+ logical_date=DEFAULT_DATE,
+ )
+ fth = FileTaskHandler("")
+
+ log_handler_output_stream, metadata = fth._read(
+ ti=local_log_file_read,
+ try_number=1,
+ metadata={"log_pos": 2},
+ )
+
+ mock_read_local.assert_called_with(path)
+
+ # Should resume from the third line only.
+ assert extract_events(log_handler_output_stream) == ["line 3"]
+ assert metadata == {"end_of_log": True, "log_pos": 3}
+
def test__read_from_local(self, tmp_path):
"""Tests the behavior of method _read_from_local"""
path1 = tmp_path / "hello1.log"