This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 5043cf9784c [v3-1-test] Fix duplicate log reads when resuming from 
log_pos (#63531) (#63571)
5043cf9784c is described below

commit 5043cf9784ccfa3373c17f85d1799002de30ca8b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Mar 14 14:48:45 2026 +0800

    [v3-1-test] Fix duplicate log reads when resuming from log_pos (#63531) 
(#63571)
    
    (cherry picked from commit 00dc4203e779ba889e3d6aa400967b23195b4280)
    
    Co-authored-by: GPK <[email protected]>
---
 .../src/airflow/utils/log/file_task_handler.py     |  2 +-
 airflow-core/tests/unit/utils/test_log_handlers.py | 29 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py 
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index adc393c1689..132ac8cc8d7 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -687,7 +687,7 @@ class FileTaskHandler(logging.Handler):
 
             # skip log stream until the last position
             if metadata and "log_pos" in metadata:
-                islice(out_stream, metadata["log_pos"])
+                out_stream = islice(out_stream, metadata["log_pos"], None)
             else:
                 # first time reading log, add messages before interleaved log 
stream
                 out_stream = chain(header, out_stream)
diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py 
b/airflow-core/tests/unit/utils/test_log_handlers.py
index 891e08d51f0..db12e713ed3 100644
--- a/airflow-core/tests/unit/utils/test_log_handlers.py
+++ b/airflow-core/tests/unit/utils/test_log_handlers.py
@@ -469,6 +469,35 @@ class TestFileTaskLogHandler:
         assert extract_events(log_handler_output_stream) == ["the log"]
         assert metadata == {"end_of_log": True, "log_pos": 1}
 
+    
@patch("airflow.utils.log.file_task_handler.FileTaskHandler._read_from_local")
+    def test__read_when_local_respects_log_pos_metadata(self, mock_read_local, 
create_task_instance):
+        path = Path(
+            
"dag_id=dag_for_testing_local_log_read/run_id=scheduled__2016-01-01T00:00:00+00:00/task_id=task_for_testing_local_log_read/attempt=1.log"
+        )
+        mock_read_local.return_value = (
+            ["the messages"],
+            [convert_list_to_stream(["line 1", "line 2", "line 3"])],
+        )
+        local_log_file_read = create_task_instance(
+            dag_id="dag_for_testing_local_log_read",
+            task_id="task_for_testing_local_log_read",
+            run_type=DagRunType.SCHEDULED,
+            logical_date=DEFAULT_DATE,
+        )
+        fth = FileTaskHandler("")
+
+        log_handler_output_stream, metadata = fth._read(
+            ti=local_log_file_read,
+            try_number=1,
+            metadata={"log_pos": 2},
+        )
+
+        mock_read_local.assert_called_with(path)
+
+        # Should resume from the third line only.
+        assert extract_events(log_handler_output_stream) == ["line 3"]
+        assert metadata == {"end_of_log": True, "log_pos": 3}
+
     def test__read_from_local(self, tmp_path):
         """Tests the behavior of method _read_from_local"""
         path1 = tmp_path / "hello1.log"

Reply via email to