This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ef87426904c Surface remote-log upload failures via structured warnings 
(#66571)
ef87426904c is described below

commit ef87426904c6bde23139b773b802fbafe3f858d2
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue May 19 13:20:05 2026 +0200

    Surface remote-log upload failures via structured warnings (#66571)
    
    * Surface remote-log upload failures via structured warnings
    
    `upload_to_remote()` silently returned in three failure paths (handler
    load failure, log-path resolution exception, handler.upload exception)
    with no signal to the operator that logs were not reaching the remote
    system. The supervisor's `_upload_logs` already logs the outermost
    exception, but the inner paths inside `upload_to_remote()` were silent
    — so a misconfigured remote handler or a transient remote-system
    outage would degrade silently while local-only logs continued.
    
    Add a dedicated `airflow.logging.remote` structlog logger and emit a
    `log.warning` at each of the three failure paths with the TI id and
    the underlying error string. No behaviour change otherwise — failures
    still fall through softly so a bad remote handler doesn't abort the
    task lifecycle.
    
    Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-004).
    
    * Address review feedback on remote-log upload warnings
    
    - Use ti.id directly (available since 3.0) instead of getattr fallback.
    - Pass exc_info=exc to structlog so the traceback is preserved on
      path-resolution and upload failures.
    - Trim past-tense from the docstring comment.
    - Add tests covering the three failure paths and the silent
      success / no-path cases.
---
 task-sdk/src/airflow/sdk/log.py     |  26 +++++++-
 task-sdk/tests/task_sdk/test_log.py | 120 ++++++++++++++++++++++++++++++++++++
 2 files changed, 144 insertions(+), 2 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index f93abe1a722..9542baa4046 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -226,20 +226,42 @@ def relative_path_from_logger(logger) -> Path | None:
 
 def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI):
     raw_logger = getattr(logger, "_logger")
+    # Dedicated logger for remote-upload visibility — operators relying on
+    # remote log handlers need a way to see when those handlers fail to load
+    # or fail to upload.
+    upload_log = structlog.get_logger("airflow.logging.remote")
 
     handler = load_remote_log_handler()
     if not handler:
+        upload_log.warning(
+            "remote_log_handler_unavailable",
+            ti_id=str(ti.id),
+            note="Remote log handler could not be loaded; logs will be 
available locally only.",
+        )
         return
 
     try:
         relative_path = relative_path_from_logger(raw_logger)
-    except Exception:
+    except Exception as exc:
+        upload_log.warning(
+            "remote_log_path_resolution_failed",
+            ti_id=str(ti.id),
+            exc_info=exc,
+        )
         return
     if not relative_path:
         return
 
     log_relative_path = relative_path.as_posix()
-    handler.upload(log_relative_path, ti)
+    try:
+        handler.upload(log_relative_path, ti)
+    except Exception as exc:
+        upload_log.warning(
+            "remote_log_upload_failed",
+            ti_id=str(ti.id),
+            log_relative_path=log_relative_path,
+            exc_info=exc,
+        )
 
 
 def mask_secret(secret: JsonValue, name: str | None = None) -> None:
diff --git a/task-sdk/tests/task_sdk/test_log.py 
b/task-sdk/tests/task_sdk/test_log.py
new file mode 100644
index 00000000000..6b7b22a4cb6
--- /dev/null
+++ b/task-sdk/tests/task_sdk/test_log.py
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import structlog
+import structlog.testing
+from uuid6 import uuid7
+
+from airflow.sdk import log as sdk_log
+
+
+def _make_ti():
+    ti = mock.MagicMock()
+    ti.id = uuid7()
+    return ti
+
+
+def _make_logger():
+    """Build a FilteringBoundLogger-like object exposing ``_logger``."""
+    logger = mock.MagicMock()
+    logger._logger = mock.MagicMock()
+    return logger
+
+
+class TestUploadToRemote:
+    def test_warns_when_handler_unavailable(self):
+        ti = _make_ti()
+        with (
+            mock.patch.object(sdk_log, "load_remote_log_handler", 
return_value=None),
+            structlog.testing.capture_logs() as captured,
+        ):
+            sdk_log.upload_to_remote(_make_logger(), ti)
+
+        events = [e for e in captured if e["event"] == 
"remote_log_handler_unavailable"]
+        assert len(events) == 1
+        assert events[0]["log_level"] == "warning"
+        assert events[0]["ti_id"] == str(ti.id)
+
+    def test_warns_when_path_resolution_fails(self):
+        ti = _make_ti()
+        handler = mock.MagicMock()
+        boom = RuntimeError("cannot resolve path")
+        with (
+            mock.patch.object(sdk_log, "load_remote_log_handler", 
return_value=handler),
+            mock.patch.object(sdk_log, "relative_path_from_logger", 
side_effect=boom),
+            structlog.testing.capture_logs() as captured,
+        ):
+            sdk_log.upload_to_remote(_make_logger(), ti)
+
+        events = [e for e in captured if e["event"] == 
"remote_log_path_resolution_failed"]
+        assert len(events) == 1
+        assert events[0]["log_level"] == "warning"
+        assert events[0]["ti_id"] == str(ti.id)
+        assert events[0]["exc_info"] is boom
+        handler.upload.assert_not_called()
+
+    def test_warns_when_upload_fails(self, tmp_path):
+        ti = _make_ti()
+        handler = mock.MagicMock()
+        boom = RuntimeError("s3 unreachable")
+        handler.upload.side_effect = boom
+        relative = tmp_path / "dag_id" / "run_id" / "task.log"
+        with (
+            mock.patch.object(sdk_log, "load_remote_log_handler", 
return_value=handler),
+            mock.patch.object(sdk_log, "relative_path_from_logger", 
return_value=relative),
+            structlog.testing.capture_logs() as captured,
+        ):
+            sdk_log.upload_to_remote(_make_logger(), ti)
+
+        events = [e for e in captured if e["event"] == 
"remote_log_upload_failed"]
+        assert len(events) == 1
+        assert events[0]["log_level"] == "warning"
+        assert events[0]["ti_id"] == str(ti.id)
+        assert events[0]["log_relative_path"] == relative.as_posix()
+        assert events[0]["exc_info"] is boom
+        handler.upload.assert_called_once_with(relative.as_posix(), ti)
+
+    def test_silent_when_relative_path_is_none(self):
+        ti = _make_ti()
+        handler = mock.MagicMock()
+        with (
+            mock.patch.object(sdk_log, "load_remote_log_handler", 
return_value=handler),
+            mock.patch.object(sdk_log, "relative_path_from_logger", 
return_value=None),
+            structlog.testing.capture_logs() as captured,
+        ):
+            sdk_log.upload_to_remote(_make_logger(), ti)
+
+        assert captured == []
+        handler.upload.assert_not_called()
+
+    def test_silent_on_success(self, tmp_path):
+        ti = _make_ti()
+        handler = mock.MagicMock()
+        relative = tmp_path / "dag_id" / "run_id" / "task.log"
+        with (
+            mock.patch.object(sdk_log, "load_remote_log_handler", 
return_value=handler),
+            mock.patch.object(sdk_log, "relative_path_from_logger", 
return_value=relative),
+            structlog.testing.capture_logs() as captured,
+        ):
+            sdk_log.upload_to_remote(_make_logger(), ti)
+
+        assert captured == []
+        handler.upload.assert_called_once_with(relative.as_posix(), ti)

Reply via email to