This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e94cc4dd96e Task SDK: Add Variable.keys() to list variable keys by
prefix (#66022)
e94cc4dd96e is described below
commit e94cc4dd96e3e09a3360529abe6b07008eb5260b
Author: Jun Yeong Kim <[email protected]>
AuthorDate: Thu May 14 01:44:59 2026 +0900
Task SDK: Add Variable.keys() to list variable keys by prefix (#66022)
* Task SDK: Add Variable.keys() to list variable keys by prefix
* Task SDK: Make Variable.keys() return a lazy proxy
Per dev list feedback, wrap the result in lazy_object_proxy.Proxy so the
Execution API call only happens on first access (iteration, indexing,
len, etc.) and is cached for subsequent accesses. Matches the pattern
already used for template context values.
Also clarifies in the docstring that only keys stored in the metadata
database are returned — secrets backends are not consulted.
* fixup! Task SDK: Make Variable.keys() return a lazy proxy
* fixup! fixup! Task SDK: Make Variable.keys() return a lazy proxy
---
.../execution_api/datamodels/variable.py | 7 ++
.../api_fastapi/execution_api/routes/variables.py | 55 ++++++++++-
.../api_fastapi/execution_api/versions/__init__.py | 2 +
.../variable.py => versions/v2026_06_30.py} | 18 +---
.../src/airflow/dag_processing/processor.py | 8 ++
.../src/airflow/jobs/triggerer_job_runner.py | 7 ++
.../execution_api/versions/head/test_variables.py | 64 +++++++++++++
.../versions/v2026_06_30/__init__.py} | 20 ----
.../versions/v2026_06_30/test_variables.py} | 21 ++---
task-sdk/src/airflow/sdk/api/client.py | 9 ++
.../src/airflow/sdk/api/datamodels/_generated.py | 14 ++-
task-sdk/src/airflow/sdk/definitions/variable.py | 25 +++++
.../sdk/execution_time/callback_supervisor.py | 6 +-
task-sdk/src/airflow/sdk/execution_time/comms.py | 15 +++
task-sdk/src/airflow/sdk/execution_time/context.py | 29 ++++++
.../airflow/sdk/execution_time/request_handlers.py | 13 +++
.../src/airflow/sdk/execution_time/supervisor.py | 4 +
.../tests/task_sdk/definitions/test_variables.py | 105 ++++++++++++++++++++-
.../execution_time/test_callback_supervisor.py | 11 +++
.../task_sdk/execution_time/test_supervisor.py | 16 ++++
20 files changed, 397 insertions(+), 52 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
index fd49a5eae46..c971a746f2c 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
@@ -34,3 +34,10 @@ class VariablePostBody(StrictBaseModel):
value: str | None = Field(alias="val")
description: str | None = Field(default=None)
+
+
+class VariableKeysResponse(StrictBaseModel):
+ """Variable keys schema for list responses."""
+
+ keys: list[str]
+ total_entries: int
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
index a0a7cb56045..631ad35ded1 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
@@ -20,9 +20,12 @@ from __future__ import annotations
import logging
from typing import Annotated
-from fastapi import APIRouter, Depends, HTTPException, Path, Request, status
+from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request,
status
+from sqlalchemy import func, select
+from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.execution_api.datamodels.variable import (
+ VariableKeysResponse,
VariablePostBody,
VariableResponse,
)
@@ -54,17 +57,55 @@ async def has_variable_access(
return True
-router = APIRouter(
- responses={status.HTTP_404_NOT_FOUND: {"description": "Variable not
found"}},
- dependencies=[Depends(has_variable_access)],
-)
+router = APIRouter()
log = logging.getLogger(__name__)
+# /keys must be declared before /{variable_key:path} so the static path is
+# matched first; otherwise the catch-all path param would swallow it.
+# has_variable_access is applied per-route below (not at router level) because
+# it requires a variable_key path parameter that /keys does not have.
[email protected](
+ "/keys",
+ responses={
+ status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
+ },
+)
+def get_variable_keys(
+ session: SessionDep,
+ team_name: Annotated[str | None, Depends(get_team_name_dep)] = None,
+ prefix: Annotated[str | None, Query()] = None,
+ limit: Annotated[int, Query(ge=1, le=10_000)] = 1000,
+ offset: Annotated[int, Query(ge=0)] = 0,
+) -> VariableKeysResponse:
+ """
+ Get Airflow Variable keys, optionally filtered by prefix.
+
+ .. note::
+ This endpoint deliberately bypasses the per-variable
``has_variable_access``
+ check, since access scoping requires a specific variable key. Any
authenticated
+ task within a team can therefore enumerate every variable key in that
team —
+ including keys for variables it would not be allowed to read. This is
consistent
+ with Airflow's security model (workers within a deployment trust each
other),
+ but the asymmetry between key enumeration and value access is
intentional.
+ """
+ stmt = select(Variable.key).order_by(Variable.key)
+ if prefix is not None:
+ stmt = stmt.where(Variable.key.startswith(prefix, autoescape=True))
+ if team_name is not None:
+ stmt = stmt.where(Variable.team_name == team_name)
+
+ total_entries =
session.scalar(select(func.count()).select_from(stmt.subquery())) or 0
+ keys = session.scalars(stmt.offset(offset).limit(limit)).all()
+ return VariableKeysResponse(keys=list(keys), total_entries=total_entries)
+
+
@router.get(
"/{variable_key:path}",
+ dependencies=[Depends(has_variable_access)],
responses={
+ status.HTTP_404_NOT_FOUND: {"description": "Variable not found"},
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access
to the variable"},
},
@@ -90,8 +131,10 @@ def get_variable(
@router.put(
"/{variable_key:path}",
+ dependencies=[Depends(has_variable_access)],
status_code=status.HTTP_201_CREATED,
responses={
+ status.HTTP_404_NOT_FOUND: {"description": "Variable not found"},
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access
to the variable"},
},
@@ -108,8 +151,10 @@ def put_variable(
@router.delete(
"/{variable_key:path}",
+ dependencies=[Depends(has_variable_access)],
status_code=status.HTTP_204_NO_CONTENT,
responses={
+ status.HTTP_404_NOT_FOUND: {"description": "Variable not found"},
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access
to the variable"},
},
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index 160b9ca9503..ab995da52d0 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -46,9 +46,11 @@ from airflow.api_fastapi.execution_api.versions.v2026_06_16
import (
AddStateEndpoints,
AddTeamNameField,
)
+from airflow.api_fastapi.execution_api.versions.v2026_06_30 import
AddVariableKeysEndpoint
bundle = VersionBundle(
HeadVersion(),
+ Version("2026-06-30", AddVariableKeysEndpoint),
Version(
"2026-06-16",
AddRetryPolicyFields,
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
similarity index 64%
copy from
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
copy to
airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
index fd49a5eae46..0bc300a4998 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
@@ -17,20 +17,12 @@
from __future__ import annotations
-from pydantic import Field
+from cadwyn import VersionChange, endpoint
-from airflow.api_fastapi.core_api.base import StrictBaseModel
+class AddVariableKeysEndpoint(VersionChange):
+ """Add GET /variables/keys endpoint for listing variable keys with
optional prefix filter."""
-class VariableResponse(StrictBaseModel):
- """Variable schema for responses with fields that are needed for
Runtime."""
+ description = __doc__
- key: str
- val: str | None = Field(alias="value")
-
-
-class VariablePostBody(StrictBaseModel):
- """Request body schema for creating variables."""
-
- value: str | None = Field(alias="val")
- description: str | None = Field(default=None)
+ instructions_to_migrate_to_previous_version = (endpoint("/variables/keys",
["GET"]).didnt_exist,)
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index aa9f07411f8..d7c0a9d2b59 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -50,6 +50,7 @@ from airflow.sdk.execution_time.comms import (
GetTaskStates,
GetTICount,
GetVariable,
+ GetVariableKeys,
GetXCom,
GetXComCount,
GetXComSequenceItem,
@@ -61,6 +62,7 @@ from airflow.sdk.execution_time.comms import (
PrevSuccessfulDagRunResult,
PutVariable,
TaskStatesResult,
+ VariableKeysResult,
VariableResult,
XComCountResponse,
XComResult,
@@ -128,6 +130,7 @@ ToManager = Annotated[
DagFileParsingResult
| GetConnection
| GetVariable
+ | GetVariableKeys
| PutVariable
| GetTaskStates
| GetTICount
@@ -147,6 +150,7 @@ ToDagProcessor = Annotated[
DagFileParseRequest
| ConnectionResult
| VariableResult
+ | VariableKeysResult
| TaskStatesResult
| PreviousDagRunResult
| PreviousTIResult
@@ -628,6 +632,10 @@ class DagFileProcessorProcess(WatchedSubprocess):
dump_opts = {"exclude_unset": True}
else:
resp = var
+ elif isinstance(msg, GetVariableKeys):
+ from airflow.sdk.execution_time.request_handlers import
handle_get_variable_keys
+
+ resp, dump_opts = handle_get_variable_keys(self.client, msg)
elif isinstance(msg, PutVariable):
self.client.variables.set(msg.key, msg.value, msg.description)
elif isinstance(msg, DeleteVariable):
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 9edde3b276e..2b4db481c26 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -74,6 +74,7 @@ from airflow.sdk.execution_time.comms import (
GetTaskStates,
GetTICount,
GetVariable,
+ GetVariableKeys,
GetXCom,
MaskSecret,
OKResponse,
@@ -82,6 +83,7 @@ from airflow.sdk.execution_time.comms import (
TaskStatesResult,
TICount,
UpdateHITLDetail,
+ VariableKeysResult,
VariableResult,
XComResult,
_new_encoder,
@@ -90,6 +92,7 @@ from airflow.sdk.execution_time.comms import (
from airflow.sdk.execution_time.request_handlers import (
handle_get_connection,
handle_get_variable,
+ handle_get_variable_keys,
handle_mask_secret,
)
from airflow.sdk.execution_time.supervisor import WatchedSubprocess,
make_buffered_socket_reader
@@ -302,6 +305,7 @@ ToTriggerRunner = Annotated[
| messages.TriggerStateSync
| ConnectionResult
| VariableResult
+ | VariableKeysResult
| XComResult
| DagRunStateResult
| DRCount
@@ -323,6 +327,7 @@ ToTriggerSupervisor = Annotated[
| GetConnection
| DeleteVariable
| GetVariable
+ | GetVariableKeys
| PutVariable
| DeleteXCom
| GetXCom
@@ -534,6 +539,8 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
resp = self.client.variables.delete(msg.key)
elif isinstance(msg, GetVariable):
resp, dump_opts = handle_get_variable(self.client, msg)
+ elif isinstance(msg, GetVariableKeys):
+ resp, dump_opts = handle_get_variable_keys(self.client, msg)
elif isinstance(msg, PutVariable):
self.client.variables.set(msg.key, msg.value, msg.description)
elif isinstance(msg, DeleteXCom):
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_variables.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_variables.py
index fe761163635..f078d6c2fe0 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_variables.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_variables.py
@@ -244,6 +244,70 @@ class TestPutVariable:
assert any(msg.startswith("Checking write access for task instance")
for msg in caplog.messages)
+class TestGetVariableKeys:
+ @pytest.mark.parametrize(
+ ("prefix", "expected_keys"),
+ [
+ pytest.param(
+ None,
+ {"prod_db_url", "prod_api_key", "dev_debug", "prodXdb",
"50%off", "50abc"},
+ id="no-prefix",
+ ),
+ pytest.param("prod_", {"prod_db_url", "prod_api_key"},
id="underscore-is-literal"),
+ pytest.param("50%", {"50%off"}, id="percent-is-literal"),
+ pytest.param("staging_", set(), id="no-match"),
+ ],
+ )
+ def test_get_variable_keys(self, client, session, prefix, expected_keys):
+ Variable.set(key="prod_db_url", value="postgres://...",
session=session)
+ Variable.set(key="prod_api_key", value="secret", session=session)
+ Variable.set(key="dev_debug", value="true", session=session)
+ Variable.set(key="prodXdb", value="x", session=session)
+ Variable.set(key="50%off", value="x", session=session)
+ Variable.set(key="50abc", value="x", session=session)
+ session.commit()
+
+ params = {"prefix": prefix} if prefix is not None else {}
+ response = client.get("/execution/variables/keys", params=params)
+
+ assert response.status_code == 200
+ body = response.json()
+ assert set(body["keys"]) == expected_keys
+ assert body["total_entries"] == len(expected_keys)
+
+ def test_get_variable_keys_empty_db(self, client):
+ response = client.get("/execution/variables/keys")
+
+ assert response.status_code == 200
+ assert response.json() == {"keys": [], "total_entries": 0}
+
+ def test_get_variable_keys_paginates_with_limit_and_offset(self, client,
session):
+ for i in range(5):
+ Variable.set(key=f"k{i}", value=str(i), session=session)
+ session.commit()
+
+ first = client.get("/execution/variables/keys", params={"limit": 2,
"offset": 0})
+ second = client.get("/execution/variables/keys", params={"limit": 2,
"offset": 2})
+ third = client.get("/execution/variables/keys", params={"limit": 2,
"offset": 4})
+
+ # Order is stable (sorted by key) so pagination is deterministic.
+ assert first.json() == {"keys": ["k0", "k1"], "total_entries": 5}
+ assert second.json() == {"keys": ["k2", "k3"], "total_entries": 5}
+ assert third.json() == {"keys": ["k4"], "total_entries": 5}
+
+ @pytest.mark.parametrize(
+ ("params", "expected_status"),
+ [
+ pytest.param({"limit": 0}, 422, id="limit-below-min"),
+ pytest.param({"limit": 10_001}, 422, id="limit-above-max"),
+ pytest.param({"offset": -1}, 422, id="offset-negative"),
+ ],
+ )
+ def test_get_variable_keys_validates_pagination_bounds(self, client,
params, expected_status):
+ response = client.get("/execution/variables/keys", params=params)
+ assert response.status_code == expected_status
+
+
class TestDeleteVariable:
@pytest.mark.parametrize(
("keys_to_create", "key_to_delete"),
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/__init__.py
similarity index 61%
copy from
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
copy to
airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/__init__.py
index fd49a5eae46..13a83393a91 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/__init__.py
@@ -14,23 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-from __future__ import annotations
-
-from pydantic import Field
-
-from airflow.api_fastapi.core_api.base import StrictBaseModel
-
-
-class VariableResponse(StrictBaseModel):
- """Variable schema for responses with fields that are needed for
Runtime."""
-
- key: str
- val: str | None = Field(alias="value")
-
-
-class VariablePostBody(StrictBaseModel):
- """Request body schema for creating variables."""
-
- value: str | None = Field(alias="val")
- description: str | None = Field(default=None)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_variables.py
similarity index 64%
copy from
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
copy to
airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_variables.py
index fd49a5eae46..c1494512290 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_variables.py
@@ -17,20 +17,19 @@
from __future__ import annotations
-from pydantic import Field
+import pytest
-from airflow.api_fastapi.core_api.base import StrictBaseModel
+pytestmark = pytest.mark.db_test
-class VariableResponse(StrictBaseModel):
- """Variable schema for responses with fields that are needed for
Runtime."""
[email protected]
+def old_ver_client(client):
+ """Last released execution API before `GET /variables/keys` was added."""
+ client.headers["Airflow-API-Version"] = "2026-06-16"
+ return client
- key: str
- val: str | None = Field(alias="value")
+def
test_variable_keys_endpoint_not_available_in_previous_version(old_ver_client):
+ response = old_ver_client.get("/execution/variables/keys")
-class VariablePostBody(StrictBaseModel):
- """Request body schema for creating variables."""
-
- value: str | None = Field(alias="val")
- description: str | None = Field(default=None)
+ assert response.status_code == 404
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index 493225b4699..269978ac9dd 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -75,6 +75,7 @@ from airflow.sdk.api.datamodels._generated import (
TITerminalStatePayload,
TriggerDAGRunPayload,
ValidationError as RemoteValidationError,
+ VariableKeysResponse,
VariablePostBody,
VariableResponse,
XComResponse,
@@ -505,6 +506,14 @@ class VariableOperations:
# decouple from the server response string
return OKResponse(ok=True)
+ def keys(self, prefix: str | None = None, limit: int = 1000, offset: int =
0) -> VariableKeysResponse:
+ """List variable keys from the API server, optionally filtered by key
prefix."""
+ params: dict[str, str | int] = {"limit": limit, "offset": offset}
+ if prefix is not None:
+ params["prefix"] = prefix
+ resp = self.client.get("variables/keys", params=params)
+ return VariableKeysResponse.model_validate_json(resp.read())
+
class XComOperations:
__slots__ = ("client",)
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index b5b100154c3..9f1dadeef51 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -27,7 +27,7 @@ from uuid import UUID
from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue,
RootModel
-API_VERSION: Final[str] = "2026-06-16"
+API_VERSION: Final[str] = "2026-06-30"
class AssetAliasReferenceAssetEventDagRun(BaseModel):
@@ -442,6 +442,18 @@ class ValidationError(BaseModel):
ctx: Annotated[dict[str, Any] | None, Field(title="Context")] = None
+class VariableKeysResponse(BaseModel):
+ """
+ Variable keys schema for list responses.
+ """
+
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ keys: Annotated[list[str], Field(title="Keys")]
+ total_entries: Annotated[int, Field(title="Total Entries")]
+
+
class VariablePostBody(BaseModel):
"""
Request body schema for creating variables.
diff --git a/task-sdk/src/airflow/sdk/definitions/variable.py
b/task-sdk/src/airflow/sdk/definitions/variable.py
index 2e4c9aae3ca..a379022a909 100644
--- a/task-sdk/src/airflow/sdk/definitions/variable.py
+++ b/task-sdk/src/airflow/sdk/definitions/variable.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import logging
+from collections.abc import Sequence
from typing import Any
import attrs
@@ -67,6 +68,30 @@ class Variable:
except AirflowRuntimeError as e:
log.exception(e)
+ @classmethod
+ def keys(cls, prefix: str | None = None) -> Sequence[str]:
+ """
+ Return Variable keys that start with the given prefix.
+
+ The keys are fetched lazily on first access (iteration, indexing, len,
etc.)
+ and cached for subsequent access.
+
+ .. note::
+ Only keys stored in the metadata database are returned — secrets
backends
+ are **not** consulted. This asymmetry with :meth:`get` (which does
consult
+ secrets backends) is a deliberate design decision: most secrets
backends
+ either do not expose a listing API at all, or do so inefficiently
and
+ without prefix filtering. See
+ https://github.com/apache/airflow/issues/61166 for context.
+
+ :param prefix: Optional key prefix to filter by. If None, all keys are
returned.
+ """
+ import lazy_object_proxy
+
+ from airflow.sdk.execution_time.context import _get_variable_keys
+
+ return lazy_object_proxy.Proxy(lambda:
_get_variable_keys(prefix=prefix))
+
@classmethod
def delete(cls, key: str) -> None:
from airflow.sdk.exceptions import AirflowRuntimeError
diff --git a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
index 94d84193192..579833f413d 100644
--- a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
@@ -34,11 +34,13 @@ from airflow.sdk.execution_time.comms import (
ErrorResponse,
GetConnection,
GetVariable,
+ GetVariableKeys,
MaskSecret,
)
from airflow.sdk.execution_time.request_handlers import (
handle_get_connection,
handle_get_variable,
+ handle_get_variable_keys,
handle_mask_secret,
)
from airflow.sdk.execution_time.supervisor import (
@@ -72,7 +74,7 @@ log: FilteringBoundLogger =
structlog.get_logger(logger_name="callback_superviso
# This is a minimal subset of ToSupervisor: read-only access to Connections
# and Variables, plus MaskSecret for the secrets masker.
CallbackToSupervisor = Annotated[
- GetConnection | GetVariable | MaskSecret,
+ GetConnection | GetVariable | GetVariableKeys | MaskSecret,
Field(discriminator="type"),
]
@@ -284,6 +286,8 @@ class CallbackSubprocess(WatchedSubprocess):
resp, dump_opts = handle_get_connection(self.client, msg)
elif isinstance(msg, GetVariable):
resp, dump_opts = handle_get_variable(self.client, msg)
+ elif isinstance(msg, GetVariableKeys):
+ resp, dump_opts = handle_get_variable_keys(self.client, msg)
elif isinstance(msg, MaskSecret):
handle_mask_secret(msg)
else:
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index 01528c728b1..a30872a6a54 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -597,6 +597,12 @@ class AssetsByAliasResult(BaseModel):
)
+class VariableKeysResult(BaseModel):
+ keys: list[str]
+ total_entries: int
+ type: Literal["VariableKeysResult"] = "VariableKeysResult"
+
+
class DagRunResult(DagRun):
type: Literal["DagRunResult"] = "DagRunResult"
@@ -783,6 +789,7 @@ ToTask = Annotated[
| TaskBreadcrumbsResult
| TaskStatesResult
| VariableResult
+ | VariableKeysResult
| XComCountResponse
| XComResult
| XComSequenceIndexResult
@@ -990,6 +997,13 @@ class GetVariable(BaseModel):
type: Literal["GetVariable"] = "GetVariable"
+class GetVariableKeys(BaseModel):
+ prefix: str | None = None
+ limit: int = 1000
+ offset: int = 0
+ type: Literal["GetVariableKeys"] = "GetVariableKeys"
+
+
class PutVariable(BaseModel):
key: str
value: str | None
@@ -1204,6 +1218,7 @@ ToSupervisor = Annotated[
| GetTaskBreadcrumbs
| GetTaskStates
| GetVariable
+ | GetVariableKeys
| GetXCom
| GetXComCount
| GetXComSequenceItem
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 7fdd8bddc7f..1aaa68ed795 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -277,6 +277,35 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
)
+_VARIABLE_KEYS_PAGE_SIZE = 1000
+
+
+def _get_variable_keys(prefix: str | None = None) -> list[str]:
+ from airflow.sdk.exceptions import AirflowRuntimeError
+ from airflow.sdk.execution_time.comms import (
+ ErrorResponse,
+ GetVariableKeys,
+ VariableKeysResult,
+ )
+ from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+
+ all_keys: list[str] = []
+ offset = 0
+ while True:
+ msg = SUPERVISOR_COMMS.send(
+ GetVariableKeys(prefix=prefix, limit=_VARIABLE_KEYS_PAGE_SIZE,
offset=offset)
+ )
+ if isinstance(msg, ErrorResponse):
+ raise AirflowRuntimeError(msg)
+ if not isinstance(msg, VariableKeysResult):
+ raise TypeError(f"Unexpected response type for GetVariableKeys:
{type(msg).__name__}")
+ all_keys.extend(msg.keys)
+ if len(msg.keys) < _VARIABLE_KEYS_PAGE_SIZE:
+ break
+ offset += len(msg.keys)
+ return all_keys
+
+
def _set_variable(key: str, value: Any, description: str | None = None,
serialize_json: bool = False) -> None:
# TODO: This should probably be moved to a separate module like
`airflow.sdk.execution_time.comms`
# or `airflow.sdk.execution_time.variable`
diff --git a/task-sdk/src/airflow/sdk/execution_time/request_handlers.py
b/task-sdk/src/airflow/sdk/execution_time/request_handlers.py
index eed3e840e39..fbd0e1cee58 100644
--- a/task-sdk/src/airflow/sdk/execution_time/request_handlers.py
+++ b/task-sdk/src/airflow/sdk/execution_time/request_handlers.py
@@ -37,7 +37,9 @@ from airflow.sdk.execution_time.comms import (
ConnectionResult,
GetConnection,
GetVariable,
+ GetVariableKeys,
MaskSecret,
+ VariableKeysResult,
VariableResult,
)
from airflow.sdk.log import mask_secret
@@ -70,6 +72,17 @@ def handle_get_variable(client: Client, msg: GetVariable) ->
tuple[BaseModel | N
return var, {}
+def handle_get_variable_keys(
+ client: Client, msg: GetVariableKeys
+) -> tuple[BaseModel | None, dict[str, bool]]:
+ """Fetch variable keys filtered by prefix."""
+ result = client.variables.keys(prefix=msg.prefix, limit=msg.limit,
offset=msg.offset)
+ return (
+ VariableKeysResult(keys=result.keys,
total_entries=result.total_entries, type="VariableKeysResult"),
+ {"exclude_unset": True},
+ )
+
+
def handle_mask_secret(msg: MaskSecret) -> None:
"""Register a value with the secrets masker."""
mask_secret(msg.value, msg.name)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 757a73b7e2e..3e6236c5786 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -101,6 +101,7 @@ from airflow.sdk.execution_time.comms import (
GetTaskStates,
GetTICount,
GetVariable,
+ GetVariableKeys,
GetXCom,
GetXComCount,
GetXComSequenceItem,
@@ -140,6 +141,7 @@ from airflow.sdk.execution_time.comms import (
from airflow.sdk.execution_time.request_handlers import (
handle_get_connection,
handle_get_variable,
+ handle_get_variable_keys,
handle_mask_secret,
)
@@ -1455,6 +1457,8 @@ class ActivitySubprocess(WatchedSubprocess):
resp, dump_opts = handle_get_connection(self.client, msg)
elif isinstance(msg, GetVariable):
resp, dump_opts = handle_get_variable(self.client, msg)
+ elif isinstance(msg, GetVariableKeys):
+ resp, dump_opts = handle_get_variable_keys(self.client, msg)
elif isinstance(msg, GetXCom):
xcom = self.client.xcoms.get(
msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index,
msg.include_prior_dates
diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py
b/task-sdk/tests/task_sdk/definitions/test_variables.py
index 3717f834735..6e94ccf503f 100644
--- a/task-sdk/tests/task_sdk/definitions/test_variables.py
+++ b/task-sdk/tests/task_sdk/definitions/test_variables.py
@@ -25,7 +25,7 @@ import pytest
from airflow.sdk import Variable
from airflow.sdk.configuration import initialize_secrets_backends
-from airflow.sdk.execution_time.comms import PutVariable, VariableResult
+from airflow.sdk.execution_time.comms import GetVariableKeys, PutVariable,
VariableKeysResult, VariableResult
from airflow.sdk.execution_time.secrets import
DEFAULT_SECRETS_SEARCH_PATH_WORKERS
from tests_common.test_utils.config import conf_vars
@@ -90,6 +90,109 @@ class TestVariables:
)
+class TestVariableKeys:
+ @pytest.mark.parametrize(
+ ("prefix", "keys"),
+ [
+ pytest.param(
+ None,
+ ["prod_db", "prod_api", "dev_debug"],
+ id="all",
+ ),
+ pytest.param(
+ "prod_",
+ ["prod_db", "prod_api"],
+ id="with-prefix",
+ ),
+ pytest.param(
+ "nonexistent_",
+ [],
+ id="empty-result",
+ ),
+ ],
+ )
+ def test_keys(self, prefix, keys, mock_supervisor_comms):
+ mock_supervisor_comms.send.return_value =
VariableKeysResult(keys=keys, total_entries=len(keys))
+
+ results = Variable.keys(prefix=prefix)
+
+ # keys() is lazy — no API call until the proxy is accessed
+ mock_supervisor_comms.send.assert_not_called()
+
+ materialized = list(results)
+
+ mock_supervisor_comms.send.assert_called_once_with(
+ msg=GetVariableKeys(prefix=prefix, limit=1000, offset=0)
+ )
+ assert materialized == keys
+
+ def test_keys_cached_after_first_access(self, mock_supervisor_comms):
+ mock_supervisor_comms.send.return_value =
VariableKeysResult(keys=["a", "b"], total_entries=2)
+
+ results = Variable.keys(prefix="x_")
+
+ # Multiple accesses should only trigger the API call once
+ list(results)
+ list(results)
+ len(results)
+
+ mock_supervisor_comms.send.assert_called_once_with(
+ msg=GetVariableKeys(prefix="x_", limit=1000, offset=0)
+ )
+
+ def test_keys_paginates_when_results_exceed_page_size(self,
mock_supervisor_comms):
+ # Simulate two full pages followed by a short page (signals end).
+ from airflow.sdk.execution_time.context import _VARIABLE_KEYS_PAGE_SIZE
+
+ page1 = [f"k{i}" for i in range(_VARIABLE_KEYS_PAGE_SIZE)]
+ page2 = [f"k{i}" for i in range(_VARIABLE_KEYS_PAGE_SIZE,
_VARIABLE_KEYS_PAGE_SIZE * 2)]
+ page3 = ["last_key"]
+ total = _VARIABLE_KEYS_PAGE_SIZE * 2 + 1
+ mock_supervisor_comms.send.side_effect = [
+ VariableKeysResult(keys=page1, total_entries=total),
+ VariableKeysResult(keys=page2, total_entries=total),
+ VariableKeysResult(keys=page3, total_entries=total),
+ ]
+
+ materialized = list(Variable.keys(prefix=None))
+
+ assert materialized == page1 + page2 + page3
+ assert mock_supervisor_comms.send.call_count == 3
+ mock_supervisor_comms.send.assert_any_call(
+ msg=GetVariableKeys(prefix=None, limit=_VARIABLE_KEYS_PAGE_SIZE,
offset=0)
+ )
+ mock_supervisor_comms.send.assert_any_call(
+ msg=GetVariableKeys(prefix=None, limit=_VARIABLE_KEYS_PAGE_SIZE,
offset=_VARIABLE_KEYS_PAGE_SIZE)
+ )
+ mock_supervisor_comms.send.assert_any_call(
+ msg=GetVariableKeys(
+ prefix=None, limit=_VARIABLE_KEYS_PAGE_SIZE,
offset=_VARIABLE_KEYS_PAGE_SIZE * 2
+ )
+ )
+
+ def test_keys_raises_on_error_response(self, mock_supervisor_comms):
+ from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
+ from airflow.sdk.execution_time.comms import ErrorResponse
+
+ mock_supervisor_comms.send.return_value = ErrorResponse(
+ error=ErrorType.GENERIC_ERROR, detail={"message": "boom"}
+ )
+
+ results = Variable.keys(prefix="x_")
+
+ with pytest.raises(AirflowRuntimeError):
+ list(results)
+
+ def test_keys_raises_on_unexpected_response_type(self,
mock_supervisor_comms):
+ # Mimic a transport / version-mismatch response that isn't
VariableKeysResult.
+ mock_supervisor_comms.send.return_value = VariableResult(key="x",
value="y")
+
+ results = Variable.keys(prefix="x_")
+
+ with pytest.raises(TypeError, match="Unexpected response type"):
+ list(results)
+
+
class TestVariableFromSecrets:
def test_var_get_from_secrets_found(self, mock_supervisor_comms, tmp_path):
"""Tests getting a variable from secrets backend."""
diff --git a/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
index 8cb9fdcc816..523a7d0c604 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
@@ -33,7 +33,9 @@ from airflow.sdk.execution_time.comms import (
ConnectionResult,
GetConnection,
GetVariable,
+ GetVariableKeys,
MaskSecret,
+ VariableKeysResult,
VariableResult,
_RequestFrame,
)
@@ -180,6 +182,15 @@ class TestCallbackHandleRequest:
response=VariableResult(key="test_key", value="test_value"),
),
),
+ RequestCase(
+ message=GetVariableKeys(prefix="test_"),
+ test_id="get_variable_keys",
+ client_mock=ClientMock(
+ method_path="variables.keys",
+ kwargs={"prefix": "test_", "limit": 1000, "offset": 0},
+ response=VariableKeysResult(keys=["test_key"],
total_entries=1),
+ ),
+ ),
RequestCase(
message=MaskSecret(value="super_secret", name="api_key"),
test_id="mask_secret",
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index a8f97f81ac2..b54477b7769 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -113,6 +113,7 @@ from airflow.sdk.execution_time.comms import (
GetTaskStates,
GetTICount,
GetVariable,
+ GetVariableKeys,
GetXCom,
GetXComCount,
GetXComSequenceItem,
@@ -147,6 +148,7 @@ from airflow.sdk.execution_time.comms import (
TriggerDagRun,
UpdateHITLDetail,
ValidateInletsAndOutlets,
+ VariableKeysResult,
VariableResult,
XComCountResponse,
XComResult,
@@ -1568,6 +1570,20 @@ REQUEST_TEST_CASES = [
),
expected_body={"ok": True, "type": "OKResponse"},
),
+ RequestTestCase(
+ message=GetVariableKeys(prefix="test_"),
+ test_id="get_variable_keys",
+ client_mock=ClientMock(
+ method_path="variables.keys",
+ kwargs={"prefix": "test_", "limit": 1000, "offset": 0},
+ response=VariableKeysResult(keys=["test_key"], total_entries=1),
+ ),
+ expected_body={
+ "keys": ["test_key"],
+ "total_entries": 1,
+ "type": "VariableKeysResult",
+ },
+ ),
RequestTestCase(
message=DeferTask(next_method="execute_callback",
classpath="my-classpath"),
test_id="patch_task_instance_to_deferred",