This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-11-test by this push:
     new 47a1ca7bd94 fix: gracefully handle 404 from worker log server for 
historical retry attempts (#63002)
47a1ca7bd94 is described below

commit 47a1ca7bd94538bf4c053701d1d8f9f7a1c30327
Author: Pradeep Kalluri <[email protected]>
AuthorDate: Fri Mar 6 19:03:32 2026 +0000

    fix: gracefully handle 404 from worker log server for historical retry 
attempts (#63002)
    
    * fix: gracefully handle 404 from worker log server for historical retry 
attempts
    
    * fix mypy
    
    ---------
    
    Co-authored-by: Elad Kalif <[email protected]>
---
 airflow/utils/log/file_task_handler.py    | 14 ++++++
 tests/utils/log/test_file_task_handler.py | 84 +++++++++++++++++++++++++++++++
 2 files changed, 98 insertions(+)

diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index 57032ecdcc8..2a64e371942 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -617,6 +617,20 @@ class FileTaskHandler(logging.Handler):
                     "See more at 
https://airflow.apache.org/docs/apache-airflow/";
                     "stable/configurations-ref.html#secret-key"
                 )
+            elif response.status_code == 404:
+                worker_log_full_path = Path(self.local_base, 
worker_log_rel_path)
+                fallback_messages, fallback_logs = 
self._read_from_local(worker_log_full_path)
+                if fallback_logs:
+                    messages.extend(fallback_messages)
+                    logs.extend(fallback_logs)
+                else:
+                    messages.append(
+                        f"Log file not found on worker '{ti.hostname}'. "
+                        f"This attempt may have run on a different worker 
whose logs "
+                        f"are no longer accessible. "
+                        f"Consider configuring remote logging (S3, GCS, etc.) 
for log persistence."
+                    )
+                return messages, logs
             # Check if the resource was properly fetched
             response.raise_for_status()
             if response.text:
diff --git a/tests/utils/log/test_file_task_handler.py 
b/tests/utils/log/test_file_task_handler.py
new file mode 100644
index 00000000000..710de4aaf14
--- /dev/null
+++ b/tests/utils/log/test_file_task_handler.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+
+class TestFileTaskHandlerLogServer:
+    """Tests for _read_from_logs_server 404 handling."""
+
+    def setup_method(self):
+        self.handler = FileTaskHandler(base_log_folder="/tmp/test_logs")
+        self.ti = MagicMock()
+        self.ti.hostname = "worker-1"
+        self.ti.triggerer_job = None
+        self.ti.task = None
+
+    @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+    @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+    @patch.object(FileTaskHandler, "_read_from_local")
+    def test_404_falls_back_to_local_when_available(self, mock_read_local, 
mock_get_url, mock_fetch):
+        """When log server returns 404 and local logs exist, use local logs."""
+        mock_get_url.return_value = ("http://worker-1/log";, 
"dag/run/task/1.log")
+        mock_response = MagicMock()
+        mock_response.status_code = 404
+        mock_fetch.return_value = mock_response
+        mock_read_local.return_value = (["Found local files:"], ["log 
content"])
+
+        messages, logs = self.handler._read_from_logs_server(self.ti, 
"dag/run/task/1.log")
+
+        assert logs == ["log content"]
+        assert "Found local files:" in messages
+        mock_read_local.assert_called_once_with(Path("/tmp/test_logs", 
"dag/run/task/1.log"))
+
+    @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+    @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+    @patch.object(FileTaskHandler, "_read_from_local")
+    def test_404_shows_clear_message_when_no_local_fallback(self, 
mock_read_local, mock_get_url, mock_fetch):
+        """When log server returns 404 and no local logs exist, show helpful 
message."""
+        mock_get_url.return_value = ("http://worker-1/log";, 
"dag/run/task/1.log")
+        mock_response = MagicMock()
+        mock_response.status_code = 404
+        mock_fetch.return_value = mock_response
+        mock_read_local.return_value = ([], [])
+
+        messages, logs = self.handler._read_from_logs_server(self.ti, 
"dag/run/task/1.log")
+
+        assert len(messages) == 1
+        assert "worker-1" in messages[0]
+        assert "no longer accessible" in messages[0]
+        assert "remote logging" in messages[0]
+        assert logs == []
+
+    @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+    @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+    def test_403_shows_secret_key_message(self, mock_get_url, mock_fetch):
+        """When log server returns 403, show secret key configuration 
message."""
+        mock_get_url.return_value = ("http://worker-1/log";, 
"dag/run/task/1.log")
+        mock_response = MagicMock()
+        mock_response.status_code = 403
+        mock_fetch.return_value = mock_response
+        mock_response.raise_for_status.side_effect = Exception("403")
+
+        messages, logs = self.handler._read_from_logs_server(self.ti, 
"dag/run/task/1.log")
+
+        assert any("secret_key" in m for m in messages)

Reply via email to