This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-11-test by this push:
new 47a1ca7bd94 fix: gracefully handle 404 from worker log server for
historical retry attempts (#63002)
47a1ca7bd94 is described below
commit 47a1ca7bd94538bf4c053701d1d8f9f7a1c30327
Author: Pradeep Kalluri <[email protected]>
AuthorDate: Fri Mar 6 19:03:32 2026 +0000
fix: gracefully handle 404 from worker log server for historical retry
attempts (#63002)
* fix: gracefully handle 404 from worker log server for historical retry
attempts
* fix mypy
---------
Co-authored-by: Elad Kalif <[email protected]>
---
airflow/utils/log/file_task_handler.py | 14 ++++++
tests/utils/log/test_file_task_handler.py | 84 +++++++++++++++++++++++++++++++
2 files changed, 98 insertions(+)
diff --git a/airflow/utils/log/file_task_handler.py
b/airflow/utils/log/file_task_handler.py
index 57032ecdcc8..2a64e371942 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -617,6 +617,20 @@ class FileTaskHandler(logging.Handler):
"See more at
https://airflow.apache.org/docs/apache-airflow/"
"stable/configurations-ref.html#secret-key"
)
+ elif response.status_code == 404:
+ worker_log_full_path = Path(self.local_base,
worker_log_rel_path)
+ fallback_messages, fallback_logs =
self._read_from_local(worker_log_full_path)
+ if fallback_logs:
+ messages.extend(fallback_messages)
+ logs.extend(fallback_logs)
+ else:
+ messages.append(
+ f"Log file not found on worker '{ti.hostname}'. "
+ f"This attempt may have run on a different worker
whose logs "
+ f"are no longer accessible. "
+ f"Consider configuring remote logging (S3, GCS, etc.)
for log persistence."
+ )
+ return messages, logs
# Check if the resource was properly fetched
response.raise_for_status()
if response.text:
diff --git a/tests/utils/log/test_file_task_handler.py
b/tests/utils/log/test_file_task_handler.py
new file mode 100644
index 00000000000..710de4aaf14
--- /dev/null
+++ b/tests/utils/log/test_file_task_handler.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+
+class TestFileTaskHandlerLogServer:
+ """Tests for _read_from_logs_server 404 handling."""
+
+ def setup_method(self):
+ self.handler = FileTaskHandler(base_log_folder="/tmp/test_logs")
+ self.ti = MagicMock()
+ self.ti.hostname = "worker-1"
+ self.ti.triggerer_job = None
+ self.ti.task = None
+
+ @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+ @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+ @patch.object(FileTaskHandler, "_read_from_local")
+ def test_404_falls_back_to_local_when_available(self, mock_read_local,
mock_get_url, mock_fetch):
+ """When log server returns 404 and local logs exist, use local logs."""
+ mock_get_url.return_value = ("http://worker-1/log",
"dag/run/task/1.log")
+ mock_response = MagicMock()
+ mock_response.status_code = 404
+ mock_fetch.return_value = mock_response
+ mock_read_local.return_value = (["Found local files:"], ["log
content"])
+
+ messages, logs = self.handler._read_from_logs_server(self.ti,
"dag/run/task/1.log")
+
+ assert logs == ["log content"]
+ assert "Found local files:" in messages
+ mock_read_local.assert_called_once_with(Path("/tmp/test_logs",
"dag/run/task/1.log"))
+
+ @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+ @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+ @patch.object(FileTaskHandler, "_read_from_local")
+ def test_404_shows_clear_message_when_no_local_fallback(self,
mock_read_local, mock_get_url, mock_fetch):
+ """When log server returns 404 and no local logs exist, show helpful
message."""
+ mock_get_url.return_value = ("http://worker-1/log",
"dag/run/task/1.log")
+ mock_response = MagicMock()
+ mock_response.status_code = 404
+ mock_fetch.return_value = mock_response
+ mock_read_local.return_value = ([], [])
+
+ messages, logs = self.handler._read_from_logs_server(self.ti,
"dag/run/task/1.log")
+
+ assert len(messages) == 1
+ assert "worker-1" in messages[0]
+ assert "no longer accessible" in messages[0]
+ assert "remote logging" in messages[0]
+ assert logs == []
+
+ @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+ @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+ def test_403_shows_secret_key_message(self, mock_get_url, mock_fetch):
+ """When log server returns 403, show secret key configuration
message."""
+ mock_get_url.return_value = ("http://worker-1/log",
"dag/run/task/1.log")
+ mock_response = MagicMock()
+ mock_response.status_code = 403
+ mock_fetch.return_value = mock_response
+ mock_response.raise_for_status.side_effect = Exception("403")
+
+ messages, logs = self.handler._read_from_logs_server(self.ti,
"dag/run/task/1.log")
+
+ assert any("secret_key" in m for m in messages)