This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8e3f689e14e Fix remote logging S3 connection retrieval in supervisor
context (#59247)
8e3f689e14e is described below
commit 8e3f689e14e26b22298f598a1cc30e21cc2eb792
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Dec 9 21:13:11 2025 +0100
Fix remote logging S3 connection retrieval in supervisor context (#59247)
* Fix remote logging S3 connection retrieval in supervisor context
Workers were unable to retrieve S3 connection credentials for remote
logging even though the credentials were successfully fetched via the
API. This caused task logs to fail uploading to S3 with errors like
"Unable to find AWS Connection ID" and falling back to boto3 credential
strategy.
The root cause was that when _remote_logging_conn sets the connection
environment variable, Connection deserialization was using the core
Connection class instead of the SDK Connection class. The core Connection
class doesn't have the from_uri() method needed for proper URI
deserialization in the supervisor context.
This fix sets _AIRFLOW_PROCESS_CONTEXT=client in the _remote_logging_conn
context manager, ensuring that Connection deserialization uses the SDK
Connection class which properly handles URI deserialization from
environment variables.
Fixes #58140
* fixup! Fix remote logging S3 connection retrieval in supervisor context
---
.../src/airflow/sdk/execution_time/supervisor.py | 16 ++++-
.../task_sdk/execution_time/test_supervisor.py | 75 +++++++++++++++++++++-
2 files changed, 87 insertions(+), 4 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index e9b200d4c4b..b59b90c73f9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -910,15 +910,25 @@ def _remote_logging_conn(client: Client):
if conn:
key = f"AIRFLOW_CONN_{conn_id.upper()}"
- old = os.getenv(key)
+ old_conn = os.getenv(key)
+ old_context = os.getenv("_AIRFLOW_PROCESS_CONTEXT")
+
os.environ[key] = conn.get_uri()
+ # Set process context to "client" so that Connection deserialization
uses SDK Connection class
+ # which has from_uri() method, instead of core Connection class
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "client"
try:
yield
finally:
- if old is None:
+ if old_conn is None:
del os.environ[key]
else:
- os.environ[key] = old
+ os.environ[key] = old_conn
+
+ if old_context is None:
+ del os.environ["_AIRFLOW_PROCESS_CONTEXT"]
+ else:
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = old_context
@attrs.define(kw_only=True)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 285e36ca7d8..f964cf764f4 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2676,7 +2676,8 @@ def test_remote_logging_conn(remote_logging, remote_conn,
expected_env, monkeypa
with _remote_logging_conn(client):
new_keys = os.environ.keys() - env.keys()
if remote_logging:
- assert new_keys == {expected_env}
+ # _remote_logging_conn sets both the connection env var
and _AIRFLOW_PROCESS_CONTEXT
+ assert new_keys == {expected_env,
"_AIRFLOW_PROCESS_CONTEXT"}
else:
assert not new_keys
@@ -2707,6 +2708,78 @@ def test_remote_logging_conn(remote_logging,
remote_conn, expected_env, monkeypa
assert connection_available["conn_uri"] is not None,
"Connection URI was None during upload"
+def test_remote_logging_conn_sets_process_context(monkeypatch, mocker):
+ """
+ Test that _remote_logging_conn sets _AIRFLOW_PROCESS_CONTEXT=client.
+ """
+ pytest.importorskip("airflow.providers.amazon", reason="'amazon' provider
not installed")
+ from airflow.models.connection import Connection as CoreConnection
+ from airflow.sdk.definitions.connection import Connection as SDKConnection
+
+ monkeypatch.delitem(sys.modules, "airflow.logging_config")
+ monkeypatch.delitem(sys.modules,
"airflow.config_templates.airflow_local_settings", raising=False)
+
+ conn_id = "s3_conn_logs"
+ conn_uri = "aws:///?region_name=us-east-1"
+
+ def handle_request(request: httpx.Request) -> httpx.Response:
+ return httpx.Response(
+ status_code=200,
+ json={
+ "conn_id": conn_id,
+ "conn_type": "aws",
+ "host": None,
+ "login": None,
+ "password": None,
+ "port": None,
+ "schema": None,
+ "extra": '{"region_name": "us-east-1"}',
+ },
+ )
+
+ with conf_vars(
+ {
+ ("logging", "remote_logging"): "True",
+ ("logging", "remote_base_log_folder"): "s3://bucket/logs",
+ ("logging", "remote_log_conn_id"): conn_id,
+ }
+ ):
+ with conf_vars(
+ {
+ ("logging", "remote_log_conn_id"): conn_id,
+ }
+ ):
+ client = make_client(transport=httpx.MockTransport(handle_request))
+
+ assert os.getenv("_AIRFLOW_PROCESS_CONTEXT") is None
+
+ conn_env_key = f"AIRFLOW_CONN_{conn_id.upper()}"
+
+ with _remote_logging_conn(client):
+ assert os.getenv("_AIRFLOW_PROCESS_CONTEXT") == "client"
+
+ assert conn_env_key in os.environ
+ stored_uri = os.environ[conn_env_key]
+ assert stored_uri == conn_uri
+
+ # Verify that Connection.get() uses SDK Connection class when
_AIRFLOW_PROCESS_CONTEXT=client
+ # Without _AIRFLOW_PROCESS_CONTEXT=client,
_get_connection_class() would return core
+ # Connection. While core Connection can handle URI
deserialization via its __init__,
+ # using SDK Connection ensures consistency and proper behavior
in supervisor context.
+ from airflow.sdk.execution_time.context import _get_connection
+
+ retrieved_conn = _get_connection(conn_id)
+
+ assert isinstance(retrieved_conn, SDKConnection)
+ assert not isinstance(retrieved_conn, CoreConnection)
+ assert retrieved_conn.conn_id == conn_id
+ assert retrieved_conn.conn_type == "aws"
+
+ # Verify _AIRFLOW_PROCESS_CONTEXT and env var is cleaned up
+ assert os.getenv("_AIRFLOW_PROCESS_CONTEXT") is None
+ assert conn_env_key not in os.environ
+
+
class TestSignalRetryLogic:
"""Test retry logic for exit codes (signals and non-signal failures) in
ActivitySubprocess."""