This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 41f20d5f6c5 Remote logging fix (#68370)
41f20d5f6c5 is described below
commit 41f20d5f6c5504c62cffe09f24abfbd88edd252d
Author: Andrei Leib <[email protected]>
AuthorDate: Mon Jun 15 23:25:45 2026 -0400
Remote logging fix (#68370)
---
.../src/airflow/sdk/execution_time/supervisor.py | 2 +-
.../task_sdk/execution_time/test_supervisor.py | 32 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 1 deletion(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index c4b9c72420a..d4617efbbae 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1180,10 +1180,10 @@ def _fetch_remote_logging_conn(conn_id: str, client:
Client) -> Connection | Non
from airflow.sdk.definitions.connection import Connection
result: Connection | None =
Connection(**conn_result.model_dump(exclude={"type"}, by_alias=True))
+ _REMOTE_LOGGING_CONN_CACHE[conn_id] = result
else:
result = None
- _REMOTE_LOGGING_CONN_CACHE[conn_id] = result
return result
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 57ac9f39b50..48932581954 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -61,6 +61,7 @@ from airflow.sdk.api.datamodels._generated import (
AssetEventResponse,
AssetProfile,
AssetResponse,
+ ConnectionResponse,
DagRun,
DagRunState,
DagRunType,
@@ -3819,6 +3820,37 @@ def
test_remote_logging_conn_caches_connection_not_client(monkeypatch):
assert all(ref() is None for ref in clients), "Client instances should
be garbage collected"
+def test_fetch_remote_logging_conn_does_not_cache_none_result(mocker):
+ """Test that connection caching doesn't cache failed lookups as None."""
+ conn_id = "test_conn"
+ client = mocker.Mock()
+ mocker.patch.object(supervisor, "ensure_secrets_backend_loaded",
return_value=[])
+ mocker.patch.dict(supervisor._REMOTE_LOGGING_CONN_CACHE, {}, clear=True)
+ client.connections.get.side_effect = [
+ ErrorResponse(error=ErrorType.PERMISSION_DENIED),
+ ConnectionResponse(
+ conn_id=conn_id,
+ conn_type="example",
+ host=None,
+ schema_=None,
+ login=None,
+ password=None,
+ port=None,
+ extra=None,
+ ),
+ ]
+
+ assert supervisor._fetch_remote_logging_conn(conn_id, client) is None
+ assert conn_id not in supervisor._REMOTE_LOGGING_CONN_CACHE
+
+ second_call_result = supervisor._fetch_remote_logging_conn(conn_id, client)
+ assert second_call_result is not None
+ assert second_call_result.conn_id == conn_id
+ assert supervisor._REMOTE_LOGGING_CONN_CACHE[conn_id] is not None
+ # The first call resulted in None and was not cached, so the second fetch
calls the API again.
+ assert client.connections.get.call_count == 2
+
+
def test_process_log_messages_from_subprocess(monkeypatch, caplog):
from airflow.sdk._shared.logging.structlog import PER_LOGGER_LEVELS