This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 44aa7321832 Warn when remote_log_conn_id is configured but not found 
(#67510)
44aa7321832 is described below

commit 44aa732183258aeb041a5c02d1296df9bc2ef89f
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu May 28 23:20:17 2026 +0200

    Warn when remote_log_conn_id is configured but not found (#67510)
    
    ``GCSRemoteLogIO.hook`` silently swallowed ``AirflowNotFoundException``
    when the configured ``remote_log_conn_id`` did not exist, falling back
    to Application Default Credentials without any signal. That made a
    misconfigured remote-log connection (logs effectively going through
    the wrong credentials) invisible to operators — a security-control
    failure with no operational indication.
    
    Replace the silent ``pass`` with a ``self.log.warning`` mentioning the
    unresolved conn_id so the fallback is visible in worker logs. The
    behaviour (falling back to ADC) is unchanged for the case where no
    ``remote_log_conn_id`` is configured.
---
 .../providers/google/cloud/log/gcs_task_handler.py | 11 ++++-
 .../unit/google/cloud/log/test_gcs_task_handler.py | 47 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py 
b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
index 8c316f038a2..24dcfd5f01c 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -95,7 +95,16 @@ class GCSRemoteLogIO(LoggingMixin):  # noqa: D101
             try:
                 return GCSHook(gcp_conn_id=conn_id)
             except AirflowNotFoundException:
-                pass
+                # The operator configured a ``remote_log_conn_id`` that 
doesn't exist. We
+                # fall back to Application Default Credentials, but the 
operator almost
+                # certainly didn't mean that — a misconfigured remote-log 
connection is a
+                # security control failure (logs going through the wrong 
credentials) and
+                # must be visible in the worker log.
+                self.log.warning(
+                    "remote_log_conn_id %r is not configured; falling back to 
Application "
+                    "Default Credentials for GCS log handler.",
+                    conn_id,
+                )
         return None
 
     @cached_property
diff --git 
a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py 
b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py
index f0fbc73ad30..a592d2f29a6 100644
--- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py
+++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py
@@ -27,6 +27,7 @@ from unittest.mock import MagicMock
 
 import pytest
 
+from airflow.providers.common.compat.sdk import AirflowNotFoundException
 from airflow.providers.google.cloud.log.gcs_task_handler import 
GCSRemoteLogIO, GCSTaskHandler
 from airflow.utils.state import TaskInstanceState
 from airflow.utils.timezone import datetime
@@ -596,3 +597,49 @@ class TestGCSTaskHandler:
             gcs_log_folder="gs://bucket/remote/log/location",
             filename_template=None,
         )
+
+
+class TestGCSRemoteLogIOMisconfiguredConn:
+    """A misconfigured ``remote_log_conn_id`` must surface as a warning.
+
+    Silently swallowing the ``AirflowNotFoundException`` lets an operator 
believe they had
+    configured a specific service-account key for log uploads when in fact 
Application
+    Default Credentials are being used — a security-control failure that 
should be visible.
+    """
+
+    @mock.patch("airflow.providers.google.cloud.log.gcs_task_handler.GCSHook")
+    def test_hook_warns_when_remote_log_conn_id_missing(self, mock_hook, 
caplog):
+        mock_hook.side_effect = AirflowNotFoundException("conn 'nonexistent' 
not found")
+        remote_io = GCSRemoteLogIO(
+            remote_base="gs://bucket/remote/log/location",
+            base_log_folder="/tmp/airflow-test-logs",
+            delete_local_copy=False,
+        )
+
+        with conf_vars({("logging", "remote_log_conn_id"): "nonexistent"}):
+            with caplog.at_level(logging.WARNING):
+                assert remote_io.hook is None
+
+        warnings_emitted = [r for r in caplog.records if r.levelno == 
logging.WARNING]
+        assert any("nonexistent" in r.getMessage() for r in warnings_emitted), 
(
+            "expected a warning mentioning the misconfigured conn_id, got: "
+            f"{[r.getMessage() for r in warnings_emitted]}"
+        )
+
+    @mock.patch("airflow.providers.google.cloud.log.gcs_task_handler.GCSHook")
+    def test_hook_silent_when_no_remote_log_conn_id_configured(self, 
mock_hook, caplog):
+        """No ``remote_log_conn_id`` set → silent fall-through to ADC; no 
warning."""
+        remote_io = GCSRemoteLogIO(
+            remote_base="gs://bucket/remote/log/location",
+            base_log_folder="/tmp/airflow-test-logs",
+            delete_local_copy=False,
+        )
+
+        with conf_vars({("logging", "remote_log_conn_id"): ""}):
+            with caplog.at_level(logging.WARNING):
+                assert remote_io.hook is None
+
+        assert not any(
+            "remote_log_conn_id" in r.getMessage() for r in caplog.records if 
r.levelno == logging.WARNING
+        )
+        mock_hook.assert_not_called()

Reply via email to