This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 48a1e5293fc Fix Connection or Variable access in Server context
(#56602)
48a1e5293fc is described below
commit 48a1e5293fc0dbd76d2c2258f796b2a04c4f9bd7
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Oct 14 21:18:24 2025 +0100
Fix Connection or Variable access in Server context (#56602)
Hooks used in API server contexts (plugins, middlewares, log handlers)
previously failed with `ImportError` for `SUPERVISOR_COMMS` because it only
exists in worker Task execution contexts (Worker, Dag processor, Trigger).
This prevented using hooks like
GCSHook or S3Hook in plugins and broke log retrieval.
Implemented automatic context detection using separate secrets backend
chains for client and server processes:
- Client contexts (workers, DAG processors, triggerers) are detected via
`SUPERVISOR_COMMS` presence and use `ExecutionAPISecretsBackend` to route
through the Execution API
- Server contexts (API server, scheduler, plugins) are detected when
`SUPERVISOR_COMMS` is unavailable and use `MetastoreBackend` for direct
database access
Fixes #56120
Fixes #56583
(cherry picked from commit ae0a330f01dd57d40232a21a59b078d8c94f4bd9)
---
.pre-commit-config.yaml | 1 +
airflow-core/newsfragments/56583.significant.rst | 49 ++++++
airflow-core/src/airflow/api_fastapi/main.py | 4 +
.../src/airflow/jobs/scheduler_job_runner.py | 5 +
airflow-core/src/airflow/secrets/__init__.py | 29 +++-
airflow-core/tests/unit/core/test_configuration.py | 6 +-
task-sdk/src/airflow/sdk/execution_time/context.py | 103 +++++--------
.../airflow/sdk/execution_time/secrets/__init__.py | 14 +-
.../sdk/execution_time/secrets/execution_api.py | 146 ++++++++++++++++++
.../src/airflow/sdk/execution_time/supervisor.py | 43 +++++-
.../tests/task_sdk/definitions/test_connection.py | 4 +-
.../tests/task_sdk/definitions/test_variables.py | 6 +-
.../tests/task_sdk/execution_time/test_context.py | 99 ++++++++++++
.../task_sdk/execution_time/test_context_cache.py | 7 +-
.../tests/task_sdk/execution_time/test_secrets.py | 169 +++++++++++++++++++++
.../task_sdk/execution_time/test_task_runner.py | 2 +-
16 files changed, 599 insertions(+), 88 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index bc4a2ba28ee..3a47832792d 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1613,6 +1613,7 @@ repos:
^airflow-core/src/airflow/operators/subdag\.py$|
^airflow-core/src/airflow/plugins_manager\.py$|
^airflow-core/src/airflow/providers_manager\.py$|
+ ^airflow-core/src/airflow/secrets/__init__.py$|
^airflow-core/src/airflow/serialization/definitions/[_a-z]+\.py$|
^airflow-core/src/airflow/serialization/enums\.py$|
^airflow-core/src/airflow/serialization/helpers\.py$|
diff --git a/airflow-core/newsfragments/56583.significant.rst
b/airflow-core/newsfragments/56583.significant.rst
new file mode 100644
index 00000000000..3e80bfe0cbb
--- /dev/null
+++ b/airflow-core/newsfragments/56583.significant.rst
@@ -0,0 +1,49 @@
+Fix Connection & Variable access in API server contexts (plugins, log handlers)
+
+Previously, hooks used in API server contexts (plugins, middlewares, log
handlers) would fail with an ``ImportError``
+for ``SUPERVISOR_COMMS``, because ``SUPERVISOR_COMMS`` only exists in task
runner child processes.
+
+This has been fixed by implementing automatic context detection with three
separate secrets backend chains:
+
+**Context Detection:**
+
+1. **Client contexts** (task runner in worker): Detected via
``SUPERVISOR_COMMS`` presence
+2. **Server contexts** (API server, scheduler): Explicitly marked with
``_AIRFLOW_PROCESS_CONTEXT=server`` environment variable
+3. **Fallback contexts** (supervisor, unknown contexts): Neither marker
present, uses minimal safe chain
+
+**Backend Chains:**
+
+- **Client**: ``EnvironmentVariablesBackend`` → ``ExecutionAPISecretsBackend``
(routes to Execution API via SUPERVISOR_COMMS)
+- **Server**: ``EnvironmentVariablesBackend`` → ``MetastoreBackend`` (direct
database access)
+- **Fallback**: ``EnvironmentVariablesBackend`` only (+ external backends from
config like AWS Secrets Manager, Vault)
+
+The fallback chain is crucial for supervisor processes (worker-side, before
task runner starts) which need to access
+external secrets for remote logging setup but should not use
``MetastoreBackend`` (to maintain worker isolation).
+
+**Architecture Benefits:**
+
+- Workers (supervisor + task runner) never use ``MetastoreBackend``,
maintaining strict isolation
+- External secrets backends (AWS Secrets Manager, Vault, etc.) work in all
three contexts
+- Supervisor falls back to Execution API client for connections not found in
external backends
+- API server and scheduler have direct database access for optimal performance
+
+**Impact:**
+
+- Hooks like ``GCSHook``, ``S3Hook`` now work correctly in log handlers and
plugins
+- No code changes required for existing plugins or hooks
+- Workers remain isolated from direct database access (network-level DB
blocking fully supported)
+- External secrets work everywhere (workers, supervisor, API server)
+- Robust handling of unknown contexts with safe minimal chain
+
+See: `#56120 <https://github.com/apache/airflow/issues/56120>`__, `#56583
<https://github.com/apache/airflow/issues/56583>`__, `#51816
<https://github.com/apache/airflow/issues/51816>`__
+
+* Types of change
+
+ * [ ] Dag changes
+ * [ ] Config changes
+ * [ ] API changes
+ * [ ] CLI changes
+ * [ ] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [ ] Code interface changes
diff --git a/airflow-core/src/airflow/api_fastapi/main.py
b/airflow-core/src/airflow/api_fastapi/main.py
index 16419b9485e..195f98a5bf3 100644
--- a/airflow-core/src/airflow/api_fastapi/main.py
+++ b/airflow-core/src/airflow/api_fastapi/main.py
@@ -19,6 +19,10 @@ from __future__ import annotations
import os
+# Mark this as a server context before any airflow imports
+# This ensures plugins loaded at import time get the correct secrets backend
chain
+os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
+
from airflow.api_fastapi.app import cached_app
# There is no way to pass the apps to this file from Airflow CLI
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 4e9889e18e2..45574451510 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1020,6 +1020,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
span.add_event(name="airflow.task.ended",
timestamp=datetime_to_nano(ti.end_date))
def _execute(self) -> int | None:
+ import os
+
+ # Mark this as a server context for secrets backend detection
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
+
self.log.info("Starting the scheduler")
reset_signals = self.register_signals()
diff --git a/airflow-core/src/airflow/secrets/__init__.py
b/airflow-core/src/airflow/secrets/__init__.py
index 5ff034b247e..63de92c4f3f 100644
--- a/airflow-core/src/airflow/secrets/__init__.py
+++ b/airflow-core/src/airflow/secrets/__init__.py
@@ -29,7 +29,7 @@ from __future__ import annotations
from airflow.utils.deprecation_tools import add_deprecated_classes
-__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH",
"DEFAULT_SECRETS_SEARCH_PATH_WORKERS"]
+__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH"]
from airflow.secrets.base_secrets import BaseSecretsBackend
@@ -38,10 +38,6 @@ DEFAULT_SECRETS_SEARCH_PATH = [
"airflow.secrets.metastore.MetastoreBackend",
]
-DEFAULT_SECRETS_SEARCH_PATH_WORKERS = [
- "airflow.secrets.environment_variables.EnvironmentVariablesBackend",
-]
-
__deprecated_classes = {
"cache": {
@@ -49,3 +45,26 @@ __deprecated_classes = {
},
}
add_deprecated_classes(__deprecated_classes, __name__)
+
+
+def __getattr__(name):
+ if name == "DEFAULT_SECRETS_SEARCH_PATH_WORKERS":
+ import warnings
+
+ warnings.warn(
+ "airflow.secrets.DEFAULT_SECRETS_SEARCH_PATH_WORKERS is moved to
the Task SDK. "
+ "Use
airflow.sdk.execution_time.secrets.DEFAULT_SECRETS_SEARCH_PATH_WORKERS
instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ try:
+ from airflow.sdk.execution_time.secrets import
DEFAULT_SECRETS_SEARCH_PATH_WORKERS
+
+ return DEFAULT_SECRETS_SEARCH_PATH_WORKERS
+ except (ImportError, AttributeError):
+ # Back-compat for older Task SDK clients
+ return [
+
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
+ ]
+
+ raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
diff --git a/airflow-core/tests/unit/core/test_configuration.py
b/airflow-core/tests/unit/core/test_configuration.py
index 20a9ce8364d..0735245f509 100644
--- a/airflow-core/tests/unit/core/test_configuration.py
+++ b/airflow-core/tests/unit/core/test_configuration.py
@@ -43,7 +43,7 @@ from airflow.configuration import (
write_default_airflow_configuration_if_needed,
)
from airflow.providers_manager import ProvidersManager
-from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
+from airflow.sdk.execution_time.secrets import
DEFAULT_SECRETS_SEARCH_PATH_WORKERS
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.markers import
skip_if_force_lowest_dependencies_marker
@@ -923,8 +923,10 @@ key7 =
backends =
initialize_secrets_backends(DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
backend_classes = [backend.__class__.__name__ for backend in backends]
- assert len(backends) == 2
+ assert len(backends) == 3
assert "SystemsManagerParameterStoreBackend" in backend_classes
+ assert "EnvironmentVariablesBackend" in backend_classes
+ assert "ExecutionAPISecretsBackend" in backend_classes
@skip_if_force_lowest_dependencies_marker
@conf_vars(
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index c7ddf60fd60..ae916aefea0 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -149,7 +149,8 @@ def _get_connection(conn_id: str) -> Connection:
except SecretCache.NotPresentException:
pass # continue to backends
- # iterate over configured backends if not in cache (or expired)
+ # Iterate over configured backends (which may include
SupervisorCommsSecretsBackend
+ # in worker contexts or MetastoreBackend in API server contexts)
backends = ensure_secrets_backend_loaded()
for secrets_backend in backends:
try:
@@ -165,26 +166,10 @@ def _get_connection(conn_id: str) -> Connection:
type(secrets_backend).__name__,
)
- if backends:
- log.debug(
- "Connection not found in any of the configured Secrets Backends.
Trying to retrieve from API server",
- conn_id=conn_id,
- )
-
- # TODO: This should probably be moved to a separate module like
`airflow.sdk.execution_time.comms`
- # or `airflow.sdk.execution_time.connection`
- # A reason to not move it to `airflow.sdk.execution_time.comms` is that
it
- # will make that module depend on Task SDK, which is not ideal because
we intend to
- # keep Task SDK as a separate package than execution time mods.
- # Also applies to _async_get_connection.
- from airflow.sdk.execution_time.comms import GetConnection
- from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
-
- msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
+ # If no backend found the connection, raise an error
+ from airflow.exceptions import AirflowNotFoundException
- conn = _process_connection_result_conn(msg)
- SecretCache.save_connection_uri(conn_id, conn.get_uri())
- return conn
+ raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
async def _async_get_connection(conn_id: str) -> Connection:
@@ -201,34 +186,36 @@ async def _async_get_connection(conn_id: str) ->
Connection:
_mask_connection_secrets(conn)
return conn
except SecretCache.NotPresentException:
- pass # continue to API
+ pass # continue to backends
- from airflow.sdk.execution_time.comms import GetConnection
from airflow.sdk.execution_time.supervisor import
ensure_secrets_backend_loaded
- from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
- # Try secrets backends first using async wrapper
+ # Try secrets backends
backends = ensure_secrets_backend_loaded()
for secrets_backend in backends:
try:
- conn = await
sync_to_async(secrets_backend.get_connection)(conn_id) # type:
ignore[assignment]
+ # Use async method if available, otherwise wrap sync method
+ if hasattr(secrets_backend, "aget_connection"):
+ conn = await secrets_backend.aget_connection(conn_id) # type:
ignore[assignment]
+ else:
+ conn = await
sync_to_async(secrets_backend.get_connection)(conn_id) # type:
ignore[assignment]
+
if conn:
- # TODO: this should probably be in get conn
- if conn.password:
- mask_secret(conn.password)
- if conn.extra:
- mask_secret(conn.extra)
+ SecretCache.save_connection_uri(conn_id, conn.get_uri())
+ _mask_connection_secrets(conn)
return conn
except Exception:
# If one backend fails, try the next one
- continue
+ log.exception(
+ "Unable to retrieve connection from secrets backend (%s). "
+ "Checking subsequent secrets backend.",
+ type(secrets_backend).__name__,
+ )
+
+ # If no backend found the connection, raise an error
+ from airflow.exceptions import AirflowNotFoundException
- # If no secrets backend has the connection, fall back to API server
- msg = await SUPERVISOR_COMMS.asend(GetConnection(conn_id=conn_id))
- conn = _process_connection_result_conn(msg)
- SecretCache.save_connection_uri(conn_id, conn.get_uri())
- _mask_connection_secrets(conn)
- return conn
+ raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
def _get_variable(key: str, deserialize_json: bool) -> Any:
@@ -250,7 +237,8 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
pass # Continue to check backends
backends = ensure_secrets_backend_loaded()
- # iterate over backends if not in cache (or expired)
+
+ # Iterate over backends if not in cache (or expired)
for secrets_backend in backends:
try:
var_val = secrets_backend.get_variable(key=key)
@@ -270,31 +258,13 @@ def _get_variable(key: str, deserialize_json: bool) ->
Any:
type(secrets_backend).__name__,
)
- if backends:
- log.debug(
- "Variable not found in any of the configured Secrets Backends.
Trying to retrieve from API server",
- key=key,
- )
-
- # TODO: This should probably be moved to a separate module like
`airflow.sdk.execution_time.comms`
- # or `airflow.sdk.execution_time.variable`
- # A reason to not move it to `airflow.sdk.execution_time.comms` is that
it
- # will make that module depend on Task SDK, which is not ideal because
we intend to
- # keep Task SDK as a separate package than execution time mods.
- from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable
- from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
-
- msg = SUPERVISOR_COMMS.send(GetVariable(key=key))
-
- if isinstance(msg, ErrorResponse):
- raise AirflowRuntimeError(msg)
+ # If no backend found the variable, raise a not found error (mirrors
_get_connection)
+ from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
+ from airflow.sdk.execution_time.comms import ErrorResponse
- if TYPE_CHECKING:
- assert isinstance(msg, VariableResult)
- variable = _convert_variable_result_to_variable(msg, deserialize_json)
- # Save raw value to ensure cache consistency regardless of
deserialize_json parameter
- SecretCache.save_variable(key, msg.value)
- return variable.value
+ raise AirflowRuntimeError(
+ ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message":
f"Variable {key} not found"})
+ )
def _set_variable(key: str, value: Any, description: str | None = None,
serialize_json: bool = False) -> None:
@@ -307,18 +277,21 @@ def _set_variable(key: str, value: Any, description: str
| None = None, serializ
from airflow.sdk.execution_time.cache import SecretCache
from airflow.sdk.execution_time.comms import PutVariable
+ from airflow.sdk.execution_time.secrets.execution_api import
ExecutionAPISecretsBackend
from airflow.sdk.execution_time.supervisor import
ensure_secrets_backend_loaded
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
# check for write conflicts on the worker
for secrets_backend in ensure_secrets_backend_loaded():
+ if isinstance(secrets_backend, ExecutionAPISecretsBackend):
+ continue
try:
var_val = secrets_backend.get_variable(key=key)
if var_val is not None:
_backend_name = type(secrets_backend).__name__
log.warning(
"The variable %s is defined in the %s secrets backend,
which takes "
- "precedence over reading from the database. The value in
the database will be "
+ "precedence over reading from the API Server. The value
from the API Server will be "
"updated, but to read it you have to delete the
conflicting variable "
"from %s",
key,
@@ -379,12 +352,16 @@ class ConnectionAccessor:
return True
def get(self, conn_id: str, default_conn: Any = None) -> Any:
+ from airflow.exceptions import AirflowNotFoundException
+
try:
return _get_connection(conn_id)
except AirflowRuntimeError as e:
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
return default_conn
raise
+ except AirflowNotFoundException:
+ return default_conn
class VariableAccessor:
diff --git a/airflow-core/src/airflow/api_fastapi/main.py
b/task-sdk/src/airflow/sdk/execution_time/secrets/__init__.py
similarity index 66%
copy from airflow-core/src/airflow/api_fastapi/main.py
copy to task-sdk/src/airflow/sdk/execution_time/secrets/__init__.py
index 16419b9485e..26c6e744de4 100644
--- a/airflow-core/src/airflow/api_fastapi/main.py
+++ b/task-sdk/src/airflow/sdk/execution_time/secrets/__init__.py
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,14 +15,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""Secrets backends for task execution contexts."""
from __future__ import annotations
-import os
+from airflow.sdk.execution_time.secrets.execution_api import
ExecutionAPISecretsBackend
-from airflow.api_fastapi.app import cached_app
+__all__ = ["ExecutionAPISecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH_WORKERS"]
-# There is no way to pass the apps to this file from Airflow CLI
-# because fastapi dev command does not accept any additional arguments
-# so environment variable is being used to pass it
-app = cached_app(apps=os.environ.get("AIRFLOW_API_APPS", "all"))
+DEFAULT_SECRETS_SEARCH_PATH_WORKERS = [
+ "airflow.secrets.environment_variables.EnvironmentVariablesBackend",
+
"airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend",
+]
diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
new file mode 100644
index 00000000000..8f32282c2bc
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
@@ -0,0 +1,146 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Secrets backend that routes requests to the Execution API."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.secrets.base_secrets import BaseSecretsBackend
+
+if TYPE_CHECKING:
+ from airflow.sdk import Connection
+
+
+class ExecutionAPISecretsBackend(BaseSecretsBackend):
+ """
+ Secrets backend for client contexts (workers, DAG processors, triggerers).
+
+ Routes connection and variable requests through SUPERVISOR_COMMS to the
+ Execution API server. This backend should only be registered in client
+ processes, not in API server/scheduler processes.
+ """
+
+ def get_conn_value(self, conn_id: str) -> str | None:
+ """
+ Get connection URI via SUPERVISOR_COMMS.
+
+ Not used since we override get_connection directly.
+ """
+ raise NotImplementedError("Use get_connection instead")
+
+ def get_connection(self, conn_id: str) -> Connection | None: # type:
ignore[override]
+ """
+ Return connection object by routing through SUPERVISOR_COMMS.
+
+ :param conn_id: connection id
+ :return: Connection object or None if not found
+ """
+ from airflow.sdk.execution_time.comms import ErrorResponse,
GetConnection
+ from airflow.sdk.execution_time.context import
_process_connection_result_conn
+ from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+
+ try:
+ msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
+
+ if isinstance(msg, ErrorResponse):
+ # Connection not found or error occurred
+ return None
+
+ # Convert ExecutionAPI response to SDK Connection
+ return _process_connection_result_conn(msg)
+ except Exception:
+ # If SUPERVISOR_COMMS fails for any reason, return None
+ # to allow fallback to other backends
+ return None
+
+ def get_variable(self, key: str) -> str | None:
+ """
+ Return variable value by routing through SUPERVISOR_COMMS.
+
+ :param key: Variable key
+ :return: Variable value or None if not found
+ """
+ from airflow.sdk.execution_time.comms import ErrorResponse,
GetVariable, VariableResult
+ from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+
+ try:
+ msg = SUPERVISOR_COMMS.send(GetVariable(key=key))
+
+ if isinstance(msg, ErrorResponse):
+ # Variable not found or error occurred
+ return None
+
+ # Extract value from VariableResult
+ if isinstance(msg, VariableResult):
+ return msg.value # Already a string | None
+ return None
+ except Exception:
+ # If SUPERVISOR_COMMS fails for any reason, return None
+ # to allow fallback to other backends
+ return None
+
+ async def aget_connection(self, conn_id: str) -> Connection | None: #
type: ignore[override]
+ """
+ Return connection object asynchronously via SUPERVISOR_COMMS.
+
+ :param conn_id: connection id
+ :return: Connection object or None if not found
+ """
+ from airflow.sdk.execution_time.comms import ErrorResponse,
GetConnection
+ from airflow.sdk.execution_time.context import
_process_connection_result_conn
+ from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+
+ try:
+ msg = await SUPERVISOR_COMMS.asend(GetConnection(conn_id=conn_id))
+
+ if isinstance(msg, ErrorResponse):
+ # Connection not found or error occurred
+ return None
+
+ # Convert ExecutionAPI response to SDK Connection
+ return _process_connection_result_conn(msg)
+ except Exception:
+ # If SUPERVISOR_COMMS fails for any reason, return None
+ # to allow fallback to other backends
+ return None
+
+ async def aget_variable(self, key: str) -> str | None:
+ """
+ Return variable value asynchronously via SUPERVISOR_COMMS.
+
+ :param key: Variable key
+ :return: Variable value or None if not found
+ """
+ from airflow.sdk.execution_time.comms import ErrorResponse,
GetVariable, VariableResult
+ from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+
+ try:
+ msg = await SUPERVISOR_COMMS.asend(GetVariable(key=key))
+
+ if isinstance(msg, ErrorResponse):
+ # Variable not found or error occurred
+ return None
+
+ # Extract value from VariableResult
+ if isinstance(msg, VariableResult):
+ return msg.value # Already a string | None
+ return None
+ except Exception:
+ # If SUPERVISOR_COMMS fails for any reason, return None
+ # to allow fallback to other backends
+ return None
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 797400c8508..632ceb9904b 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1755,13 +1755,48 @@ def forward_to_log(
def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]:
- """Initialize the secrets backend on workers."""
+ """
+ Initialize secrets backend with auto-detected context.
+
+ Detection strategy:
+ 1. SUPERVISOR_COMMS exists and is set → client chain
(ExecutionAPISecretsBackend)
+ 2. _AIRFLOW_PROCESS_CONTEXT=server env var → server chain
(MetastoreBackend)
+ 3. Neither → fallback chain (only env vars + external backends, no
MetastoreBackend)
+
+ Client contexts: task runner in worker (has SUPERVISOR_COMMS)
+ Server contexts: API server, scheduler (set
_AIRFLOW_PROCESS_CONTEXT=server)
+ Fallback contexts: supervisor, unknown contexts (no SUPERVISOR_COMMS, no
env var)
+
+ The fallback chain ensures supervisor can use external secrets (AWS
Secrets Manager,
+ Vault, etc.) while falling back to API client, without trying
MetastoreBackend.
+ """
+ import os
+
from airflow.configuration import ensure_secrets_loaded
- from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
+ from airflow.sdk.execution_time.secrets import
DEFAULT_SECRETS_SEARCH_PATH_WORKERS
+
+ # 1. Check for client context (SUPERVISOR_COMMS)
+ try:
+ from airflow.sdk.execution_time import task_runner
- backends =
ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
+ if hasattr(task_runner, "SUPERVISOR_COMMS") and
task_runner.SUPERVISOR_COMMS is not None:
+ # Client context: task runner with SUPERVISOR_COMMS
+ return
ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
+ except (ImportError, AttributeError):
+ pass
- return backends
+ # 2. Check for explicit server context
+ if os.environ.get("_AIRFLOW_PROCESS_CONTEXT") == "server":
+ # Server context: API server, scheduler
+ # uses the default server list
+ return ensure_secrets_loaded()
+
+ # 3. Fallback for unknown contexts (supervisor, etc.)
+ # Only env vars + external backends from config, no MetastoreBackend, no
ExecutionAPISecretsBackend
+ fallback_backends = [
+ "airflow.secrets.environment_variables.EnvironmentVariablesBackend",
+ ]
+ return ensure_secrets_loaded(default_backends=fallback_backends)
def _configure_logging(log_path: str, client: Client) ->
tuple[FilteringBoundLogger, BinaryIO | TextIO]:
diff --git a/task-sdk/tests/task_sdk/definitions/test_connection.py
b/task-sdk/tests/task_sdk/definitions/test_connection.py
index eff5ed1e02b..73bc938abc1 100644
--- a/task-sdk/tests/task_sdk/definitions/test_connection.py
+++ b/task-sdk/tests/task_sdk/definitions/test_connection.py
@@ -28,7 +28,7 @@ from airflow.exceptions import AirflowException,
AirflowNotFoundException
from airflow.sdk import Connection
from airflow.sdk.exceptions import ErrorType
from airflow.sdk.execution_time.comms import ConnectionResult, ErrorResponse
-from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
+from airflow.sdk.execution_time.secrets import
DEFAULT_SECRETS_SEARCH_PATH_WORKERS
from tests_common.test_utils.config import conf_vars
@@ -265,7 +265,7 @@ class TestConnectionsFromSecrets:
mock_env_get.return_value = Connection(conn_id="something",
conn_type="some-type")
backends =
initialize_secrets_backends(DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
- assert len(backends) == 2
+ assert len(backends) == 3
backend_classes = [backend.__class__.__name__ for backend in backends]
assert "LocalFilesystemBackend" in backend_classes
diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py
b/task-sdk/tests/task_sdk/definitions/test_variables.py
index c85924df6a6..7ab388d6d50 100644
--- a/task-sdk/tests/task_sdk/definitions/test_variables.py
+++ b/task-sdk/tests/task_sdk/definitions/test_variables.py
@@ -26,7 +26,7 @@ import pytest
from airflow.configuration import initialize_secrets_backends
from airflow.sdk import Variable
from airflow.sdk.execution_time.comms import PutVariable, VariableResult
-from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
+from airflow.sdk.execution_time.secrets import
DEFAULT_SECRETS_SEARCH_PATH_WORKERS
from tests_common.test_utils.config import conf_vars
@@ -176,9 +176,11 @@ class TestVariableFromSecrets:
mock_env_get.return_value = "fake_value"
backends =
initialize_secrets_backends(DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
- assert len(backends) == 2
+ # LocalFilesystemBackend (custom), EnvironmentVariablesBackend,
ExecutionAPISecretsBackend
+ assert len(backends) == 3
backend_classes = [backend.__class__.__name__ for backend in backends]
assert "LocalFilesystemBackend" in backend_classes
+ assert "ExecutionAPISecretsBackend" in backend_classes
var = Variable.get(key="fake_var_key")
# mock_env is only called when LocalFilesystemBackend doesn't have it
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index bd264286a5c..df048d1bf48 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -60,10 +60,12 @@ from airflow.sdk.execution_time.context import (
_AssetRefResolutionMixin,
_async_get_connection,
_convert_variable_result_to_variable,
+ _get_connection,
_process_connection_result_conn,
context_to_airflow_vars,
set_current_context,
)
+from airflow.sdk.execution_time.secrets import ExecutionAPISecretsBackend
def test_convert_connection_result_conn():
@@ -765,3 +767,100 @@ class TestAsyncGetConnection:
# Should not have tried SUPERVISOR_COMMS since secrets backend had
the connection
mock_supervisor_comms.send.assert_not_called()
mock_supervisor_comms.asend.assert_not_called()
+
+
+class TestSecretsBackend:
+ """Test that connection resolution uses the backend chain correctly."""
+
+ def test_execution_api_backend_in_worker_chain(self):
+ """Test that ExecutionAPISecretsBackend is in the worker search
path."""
+ from airflow.sdk.execution_time.secrets import
DEFAULT_SECRETS_SEARCH_PATH_WORKERS
+
+ assert (
+
"airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend"
+ in DEFAULT_SECRETS_SEARCH_PATH_WORKERS
+ )
+
+ def test_metastore_backend_in_server_chain(self):
+ """Test that MetastoreBackend is in the API server search path."""
+ from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH
+
+ assert "airflow.secrets.metastore.MetastoreBackend" in
DEFAULT_SECRETS_SEARCH_PATH
+ assert (
+
"airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend"
+ not in DEFAULT_SECRETS_SEARCH_PATH
+ )
+
+ def test_get_connection_uses_backend_chain(self, mock_supervisor_comms):
+ """Test that _get_connection properly iterates through backends."""
+ from airflow.sdk.api.datamodels._generated import ConnectionResponse
+ from airflow.sdk.execution_time.comms import ConnectionResult
+
+ # Mock connection response
+ conn_response = ConnectionResponse(
+ conn_id="test_conn",
+ conn_type="http",
+ host="example.com",
+ port=443,
+ )
+ conn_result = ConnectionResult.from_conn_response(conn_response)
+ mock_supervisor_comms.send.return_value = conn_result
+
+ # Mock the backend loading to include our SupervisorComms backend
+ supervisor_backend = ExecutionAPISecretsBackend()
+
+ with
patch("airflow.sdk.execution_time.supervisor.ensure_secrets_backend_loaded") as
mock_load:
+ mock_load.return_value = [supervisor_backend]
+
+ conn = _get_connection("test_conn")
+
+ assert conn is not None
+ assert conn.conn_id == "test_conn"
+ assert conn.host == "example.com"
+ mock_supervisor_comms.send.assert_called_once()
+
+ def test_get_connection_backend_fallback(self, mock_supervisor_comms):
+ """Test that _get_connection falls through backends correctly."""
+ from airflow.sdk.api.datamodels._generated import ConnectionResponse
+ from airflow.sdk.execution_time.comms import ConnectionResult
+
+ # First backend returns nothing (simulating env var backend with no
env var)
+ class EmptyBackend:
+ def get_connection(self, conn_id):
+ return None
+
+ # Second backend returns the connection
+ conn_response = ConnectionResponse(
+ conn_id="test_conn",
+ conn_type="postgres",
+ host="db.example.com",
+ )
+ conn_result = ConnectionResult.from_conn_response(conn_response)
+ mock_supervisor_comms.send.return_value = conn_result
+
+ supervisor_backend = ExecutionAPISecretsBackend()
+
+ with
patch("airflow.sdk.execution_time.supervisor.ensure_secrets_backend_loaded") as
mock_load:
+ mock_load.return_value = [EmptyBackend(), supervisor_backend]
+
+ conn = _get_connection("test_conn")
+
+ assert conn is not None
+ assert conn.conn_id == "test_conn"
+ # SupervisorComms backend was called (first backend returned None)
+ mock_supervisor_comms.send.assert_called_once()
+
+ def test_get_connection_not_found_raises_error(self,
mock_supervisor_comms):
+ """Test that _get_connection raises error when no backend finds
connection."""
+ from airflow.exceptions import AirflowNotFoundException
+
+ # Backend returns None (not found)
+ class EmptyBackend:
+ def get_connection(self, conn_id):
+ return None
+
+ with
patch("airflow.sdk.execution_time.supervisor.ensure_secrets_backend_loaded") as
mock_load:
+ mock_load.return_value = [EmptyBackend()]
+
+ with pytest.raises(AirflowNotFoundException, match="isn't
defined"):
+ _get_connection("nonexistent_conn")
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context_cache.py
b/task-sdk/tests/task_sdk/execution_time/test_context_cache.py
index 591bdd617b6..d415c0334a4 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context_cache.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context_cache.py
@@ -29,6 +29,7 @@ from airflow.sdk.execution_time.context import (
_get_variable,
_set_variable,
)
+from airflow.sdk.execution_time.secrets import ExecutionAPISecretsBackend
from tests_common.test_utils.config import conf_vars
@@ -98,7 +99,7 @@ class TestConnectionCacheIntegration:
password="pass",
)
- mock_ensure_backends.return_value = []
+ mock_ensure_backends.return_value = [ExecutionAPISecretsBackend()]
mock_supervisor_comms.send.return_value = conn_result
@@ -188,7 +189,7 @@ class TestVariableCacheIntegration:
value = "test_value"
var_result = VariableResult(key=key, value=value)
- mock_ensure_backends.return_value = []
+ mock_ensure_backends.return_value = [ExecutionAPISecretsBackend()]
mock_supervisor_comms.send.return_value = var_result
result = _get_variable(key, deserialize_json=False)
@@ -316,7 +317,7 @@ class TestCacheDisabled:
conn_id = "test_conn"
conn_result = ConnectionResult(conn_id=conn_id, conn_type="mysql",
host="host")
- mock_ensure_backends.return_value = []
+ mock_ensure_backends.return_value = [ExecutionAPISecretsBackend()]
mock_supervisor_comms.send.return_value = conn_result
diff --git a/task-sdk/tests/task_sdk/execution_time/test_secrets.py
b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
new file mode 100644
index 00000000000..bda87a6a64a
--- /dev/null
+++ b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pytest
+
+from airflow.sdk.execution_time.secrets.execution_api import
ExecutionAPISecretsBackend
+
+
+class TestExecutionAPISecretsBackend:
+ """Test ExecutionAPISecretsBackend."""
+
+ def test_get_connection_via_supervisor_comms(self, mock_supervisor_comms):
+ """Test that connection is retrieved via SUPERVISOR_COMMS."""
+ from airflow.sdk.api.datamodels._generated import ConnectionResponse
+ from airflow.sdk.execution_time.comms import ConnectionResult
+
+ # Mock connection response
+ conn_response = ConnectionResponse(
+ conn_id="test_conn",
+ conn_type="http",
+ host="example.com",
+ port=443,
+ schema="https",
+ )
+ conn_result = ConnectionResult.from_conn_response(conn_response)
+ mock_supervisor_comms.send.return_value = conn_result
+
+ backend = ExecutionAPISecretsBackend()
+ conn = backend.get_connection("test_conn")
+
+ assert conn is not None
+ assert conn.conn_id == "test_conn"
+ assert conn.conn_type == "http"
+ assert conn.host == "example.com"
+ mock_supervisor_comms.send.assert_called_once()
+
+ def test_get_connection_not_found(self, mock_supervisor_comms):
+ """Test that None is returned when connection not found."""
+ from airflow.sdk.exceptions import ErrorType
+ from airflow.sdk.execution_time.comms import ErrorResponse
+
+ # Mock error response
+ error_response = ErrorResponse(error=ErrorType.CONNECTION_NOT_FOUND,
detail={"message": "Not found"})
+ mock_supervisor_comms.send.return_value = error_response
+
+ backend = ExecutionAPISecretsBackend()
+ conn = backend.get_connection("nonexistent")
+
+ assert conn is None
+ mock_supervisor_comms.send.assert_called_once()
+
+ def test_get_variable_via_supervisor_comms(self, mock_supervisor_comms):
+ """Test that variable is retrieved via SUPERVISOR_COMMS."""
+ from airflow.sdk.execution_time.comms import VariableResult
+
+ # Mock variable response
+ var_result = VariableResult(key="test_var", value="test_value")
+ mock_supervisor_comms.send.return_value = var_result
+
+ backend = ExecutionAPISecretsBackend()
+ value = backend.get_variable("test_var")
+
+ assert value == "test_value"
+ mock_supervisor_comms.send.assert_called_once()
+
+ def test_get_variable_not_found(self, mock_supervisor_comms):
+ """Test that None is returned when variable not found."""
+ from airflow.sdk.exceptions import ErrorType
+ from airflow.sdk.execution_time.comms import ErrorResponse
+
+ # Mock error response
+ error_response = ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND,
detail={"message": "Not found"})
+ mock_supervisor_comms.send.return_value = error_response
+
+ backend = ExecutionAPISecretsBackend()
+ value = backend.get_variable("nonexistent")
+
+ assert value is None
+ mock_supervisor_comms.send.assert_called_once()
+
+ def test_get_connection_handles_exception(self, mock_supervisor_comms):
+ """Test that exceptions are handled gracefully."""
+ mock_supervisor_comms.send.side_effect = RuntimeError("Connection
failed")
+
+ backend = ExecutionAPISecretsBackend()
+ conn = backend.get_connection("test_conn")
+
+ # Should return None on exception to allow fallback to other backends
+ assert conn is None
+
+ def test_get_variable_handles_exception(self, mock_supervisor_comms):
+ """Test that exceptions are handled gracefully for variables."""
+ mock_supervisor_comms.send.side_effect = RuntimeError("Communication
failed")
+
+ backend = ExecutionAPISecretsBackend()
+ value = backend.get_variable("test_var")
+
+ # Should return None on exception to allow fallback to other backends
+ assert value is None
+
+ def test_get_conn_value_not_implemented(self):
+ """Test that get_conn_value raises NotImplementedError."""
+ backend = ExecutionAPISecretsBackend()
+ with pytest.raises(NotImplementedError, match="Use get_connection
instead"):
+ backend.get_conn_value("test_conn")
+
+
+class TestContextDetection:
+ """Test context detection in ensure_secrets_backend_loaded."""
+
+ def test_client_context_with_supervisor_comms(self, mock_supervisor_comms):
+ """Client context: SUPERVISOR_COMMS set → uses worker chain."""
+ from airflow.sdk.execution_time.supervisor import
ensure_secrets_backend_loaded
+
+ backends = ensure_secrets_backend_loaded()
+ backend_classes = [type(b).__name__ for b in backends]
+ assert "ExecutionAPISecretsBackend" in backend_classes
+ assert "MetastoreBackend" not in backend_classes
+
+ def test_server_context_with_env_var(self, monkeypatch):
+ """Server context: env var set → uses server chain."""
+ import sys
+
+ from airflow.sdk.execution_time.supervisor import
ensure_secrets_backend_loaded
+
+ monkeypatch.setenv("_AIRFLOW_PROCESS_CONTEXT", "server")
+ # Ensure SUPERVISOR_COMMS is not available
+ if "airflow.sdk.execution_time.task_runner" in sys.modules:
+ monkeypatch.delitem(sys.modules,
"airflow.sdk.execution_time.task_runner")
+
+ backends = ensure_secrets_backend_loaded()
+ backend_classes = [type(b).__name__ for b in backends]
+ assert "MetastoreBackend" in backend_classes
+ assert "ExecutionAPISecretsBackend" not in backend_classes
+
+ def test_fallback_context_no_markers(self, monkeypatch):
+ """Fallback context: no SUPERVISOR_COMMS, no env var → only env vars +
external."""
+ import sys
+
+ from airflow.sdk.execution_time.supervisor import
ensure_secrets_backend_loaded
+
+ # Ensure no SUPERVISOR_COMMS
+ if "airflow.sdk.execution_time.task_runner" in sys.modules:
+ monkeypatch.delitem(sys.modules,
"airflow.sdk.execution_time.task_runner")
+
+ # Ensure no env var
+ monkeypatch.delenv("_AIRFLOW_PROCESS_CONTEXT", raising=False)
+
+ backends = ensure_secrets_backend_loaded()
+ backend_classes = [type(b).__name__ for b in backends]
+ assert "EnvironmentVariablesBackend" in backend_classes
+ assert "MetastoreBackend" not in backend_classes
+ assert "ExecutionAPISecretsBackend" not in backend_classes
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index fad6e4d66e8..930d80590ea 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -1384,7 +1384,7 @@ class TestRuntimeTaskInstance:
# Access the variable from the context
var_from_context = context["var"][accessor_type].test_key
-
mock_supervisor_comms.send.assert_called_once_with(GetVariable(key="test_key"))
+ mock_supervisor_comms.send.assert_any_call(GetVariable(key="test_key"))
assert var_from_context == expected_value