This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e3f689e14e Fix remote logging S3 connection retrieval in supervisor 
context (#59247)
8e3f689e14e is described below

commit 8e3f689e14e26b22298f598a1cc30e21cc2eb792
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Dec 9 21:13:11 2025 +0100

    Fix remote logging S3 connection retrieval in supervisor context (#59247)
    
    * Fix remote logging S3 connection retrieval in supervisor context
    
    Workers were unable to retrieve S3 connection credentials for remote
    logging even though the credentials were successfully fetched via the
    API. This caused task logs to fail uploading to S3 with errors like
    "Unable to find AWS Connection ID" and falling back to boto3 credential
    strategy.
    
    The root cause was that when _remote_logging_conn sets the connection
    environment variable, Connection deserialization was using the core
    Connection class instead of the SDK Connection class. The core Connection
    class doesn't have the from_uri() method needed for proper URI
    deserialization in the supervisor context.
    
    This fix sets _AIRFLOW_PROCESS_CONTEXT=client in the _remote_logging_conn
    context manager, ensuring that Connection deserialization uses the SDK
    Connection class which properly handles URI deserialization from
    environment variables.
    
    Fixes #58140
    
    * fixup! Fix remote logging S3 connection retrieval in supervisor context
---
 .../src/airflow/sdk/execution_time/supervisor.py   | 16 ++++-
 .../task_sdk/execution_time/test_supervisor.py     | 75 +++++++++++++++++++++-
 2 files changed, 87 insertions(+), 4 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index e9b200d4c4b..b59b90c73f9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -910,15 +910,25 @@ def _remote_logging_conn(client: Client):
 
     if conn:
         key = f"AIRFLOW_CONN_{conn_id.upper()}"
-        old = os.getenv(key)
+        old_conn = os.getenv(key)
+        old_context = os.getenv("_AIRFLOW_PROCESS_CONTEXT")
+
         os.environ[key] = conn.get_uri()
+        # Set process context to "client" so that Connection deserialization 
uses SDK Connection class
+        # which has from_uri() method, instead of core Connection class
+        os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "client"
         try:
             yield
         finally:
-            if old is None:
+            if old_conn is None:
                 del os.environ[key]
             else:
-                os.environ[key] = old
+                os.environ[key] = old_conn
+
+            if old_context is None:
+                del os.environ["_AIRFLOW_PROCESS_CONTEXT"]
+            else:
+                os.environ["_AIRFLOW_PROCESS_CONTEXT"] = old_context
 
 
 @attrs.define(kw_only=True)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 285e36ca7d8..f964cf764f4 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2676,7 +2676,8 @@ def test_remote_logging_conn(remote_logging, remote_conn, 
expected_env, monkeypa
             with _remote_logging_conn(client):
                 new_keys = os.environ.keys() - env.keys()
                 if remote_logging:
-                    assert new_keys == {expected_env}
+                    # _remote_logging_conn sets both the connection env var 
and _AIRFLOW_PROCESS_CONTEXT
+                    assert new_keys == {expected_env, 
"_AIRFLOW_PROCESS_CONTEXT"}
                 else:
                     assert not new_keys
 
@@ -2707,6 +2708,78 @@ def test_remote_logging_conn(remote_logging, 
remote_conn, expected_env, monkeypa
                 assert connection_available["conn_uri"] is not None, 
"Connection URI was None during upload"
 
 
+def test_remote_logging_conn_sets_process_context(monkeypatch, mocker):
+    """
+    Test that _remote_logging_conn sets _AIRFLOW_PROCESS_CONTEXT=client.
+    """
+    pytest.importorskip("airflow.providers.amazon", reason="'amazon' provider 
not installed")
+    from airflow.models.connection import Connection as CoreConnection
+    from airflow.sdk.definitions.connection import Connection as SDKConnection
+
+    monkeypatch.delitem(sys.modules, "airflow.logging_config")
+    monkeypatch.delitem(sys.modules, 
"airflow.config_templates.airflow_local_settings", raising=False)
+
+    conn_id = "s3_conn_logs"
+    conn_uri = "aws:///?region_name=us-east-1"
+
+    def handle_request(request: httpx.Request) -> httpx.Response:
+        return httpx.Response(
+            status_code=200,
+            json={
+                "conn_id": conn_id,
+                "conn_type": "aws",
+                "host": None,
+                "login": None,
+                "password": None,
+                "port": None,
+                "schema": None,
+                "extra": '{"region_name": "us-east-1"}',
+            },
+        )
+
+    with conf_vars(
+        {
+            ("logging", "remote_logging"): "True",
+            ("logging", "remote_base_log_folder"): "s3://bucket/logs",
+            ("logging", "remote_log_conn_id"): conn_id,
+        }
+    ):
+        with conf_vars(
+            {
+                ("logging", "remote_log_conn_id"): conn_id,
+            }
+        ):
+            client = make_client(transport=httpx.MockTransport(handle_request))
+
+            assert os.getenv("_AIRFLOW_PROCESS_CONTEXT") is None
+
+            conn_env_key = f"AIRFLOW_CONN_{conn_id.upper()}"
+
+            with _remote_logging_conn(client):
+                assert os.getenv("_AIRFLOW_PROCESS_CONTEXT") == "client"
+
+                assert conn_env_key in os.environ
+                stored_uri = os.environ[conn_env_key]
+                assert stored_uri == conn_uri
+
+                # Verify that Connection.get() uses SDK Connection class when 
_AIRFLOW_PROCESS_CONTEXT=client
+                # Without _AIRFLOW_PROCESS_CONTEXT=client, 
_get_connection_class() would return core
+                # Connection. While core Connection can handle URI 
deserialization via its __init__,
+                # using SDK Connection ensures consistency and proper behavior 
in supervisor context.
+                from airflow.sdk.execution_time.context import _get_connection
+
+                retrieved_conn = _get_connection(conn_id)
+
+                assert isinstance(retrieved_conn, SDKConnection)
+                assert not isinstance(retrieved_conn, CoreConnection)
+                assert retrieved_conn.conn_id == conn_id
+                assert retrieved_conn.conn_type == "aws"
+
+            # Verify _AIRFLOW_PROCESS_CONTEXT and env var is cleaned up
+            assert os.getenv("_AIRFLOW_PROCESS_CONTEXT") is None
+            assert conn_env_key not in os.environ
+
+
 class TestSignalRetryLogic:
     """Test retry logic for exit codes (signals and non-signal failures) in 
ActivitySubprocess."""
 

Reply via email to