This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d32091cd4f4 Check team boundaries in variables (#58905)
d32091cd4f4 is described below
commit d32091cd4f4b2b0935bc71e02e2995cdb8713a30
Author: Vincent <[email protected]>
AuthorDate: Wed Dec 10 13:52:01 2025 -0500
Check team boundaries in variables (#58905)
---
.../core_api/routes/public/variables.py | 8 ++-
.../src/airflow/api_fastapi/execution_api/deps.py | 22 ++++++++
.../api_fastapi/execution_api/routes/variables.py | 19 ++++---
.../src/airflow/cli/commands/team_command.py | 4 ++
airflow-core/src/airflow/models/variable.py | 59 +++++++++++++++-----
.../src/airflow/secrets/environment_variables.py | 9 +++-
.../src/airflow/secrets/local_filesystem.py | 2 +-
airflow-core/src/airflow/secrets/metastore.py | 13 +++--
airflow-core/tests/unit/always/test_secrets.py | 10 ++--
.../tests/unit/cli/commands/test_team_command.py | 4 ++
airflow-core/tests/unit/models/test_variable.py | 62 ++++++++++++++++++++++
.../amazon/aws/secrets/secrets_manager.py | 3 +-
.../amazon/aws/secrets/systems_manager.py | 3 +-
.../google/cloud/secrets/secret_manager.py | 3 +-
.../airflow/providers/hashicorp/secrets/vault.py | 3 +-
.../providers/microsoft/azure/secrets/key_vault.py | 3 +-
.../airflow/providers/yandex/secrets/lockbox.py | 3 +-
.../src/airflow_shared/secrets_backend/base.py | 3 +-
.../sdk/execution_time/secrets/execution_api.py | 4 +-
19 files changed, 196 insertions(+), 41 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
index db78f0e542c..5f9bcaee4d3 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
@@ -19,7 +19,7 @@ from __future__ import annotations
from typing import Annotated
from fastapi import Depends, HTTPException, Query, status
-from sqlalchemy import select
+from sqlalchemy import delete, select
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.parameters import (
@@ -62,7 +62,11 @@ def delete_variable(
session: SessionDep,
):
"""Delete a variable entry."""
- if Variable.delete(variable_key, session) == 0:
+ # Like the other endpoints (get, patch), we do not use
Variable.delete/get/set here because these methods
+ # are intended to be used in task execution environment (execution API)
+ result = session.execute(delete(Variable).where(Variable.key ==
variable_key))
+ rows = getattr(result, "rowcount", 0) or 0
+ if rows == 0:
raise HTTPException(
status.HTTP_404_NOT_FOUND, f"The Variable with key:
`{variable_key}` was not found"
)
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/deps.py
b/airflow-core/src/airflow/api_fastapi/execution_api/deps.py
index d247a31f5f4..fce188d48ed 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/deps.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/deps.py
@@ -24,9 +24,15 @@ import structlog
import svcs
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPBearer
+from sqlalchemy import select
from airflow.api_fastapi.auth.tokens import JWTValidator
+from airflow.api_fastapi.common.db.common import AsyncSessionDep
from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+from airflow.configuration import conf
+from airflow.models import DagModel, TaskInstance
+from airflow.models.dagbundle import DagBundleModel
+from airflow.models.team import Team
log = structlog.get_logger(logger_name=__name__)
@@ -95,3 +101,19 @@ JWTBearerDep: TIToken = Depends(JWTBearer())
# This checks that the UUID in the url matches the one in the token for us.
JWTBearerTIPathDep = Depends(JWTBearer(path_param_name="task_instance_id"))
+
+
+async def get_team_name_dep(session: AsyncSessionDep, token=JWTBearerDep) ->
str | None:
+ """Return the team name associated to the task (if any)."""
+ if not conf.getboolean("core", "multi_team"):
+ return None
+
+ stmt = (
+ select(Team.name)
+ .select_from(TaskInstance)
+ .join(DagModel, DagModel.dag_id == TaskInstance.dag_id)
+ .join(DagBundleModel, DagBundleModel.name == DagModel.bundle_name)
+ .join(DagBundleModel.teams)
+ .where(TaskInstance.id == str(token.id))
+ )
+ return await session.scalar(stmt)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
index d2f5d21349c..5621b6cd081 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import logging
+from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Path, Request, status
@@ -25,7 +26,7 @@ from airflow.api_fastapi.execution_api.datamodels.variable
import (
VariablePostBody,
VariableResponse,
)
-from airflow.api_fastapi.execution_api.deps import JWTBearerDep
+from airflow.api_fastapi.execution_api.deps import JWTBearerDep,
get_team_name_dep
from airflow.models.variable import Variable
@@ -61,13 +62,15 @@ log = logging.getLogger(__name__)
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access
to the variable"},
},
)
-def get_variable(variable_key: str) -> VariableResponse:
+def get_variable(
+ variable_key: str, team_name: Annotated[str | None,
Depends(get_team_name_dep)]
+) -> VariableResponse:
"""Get an Airflow Variable."""
if not variable_key:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Not Found")
try:
- variable_value = Variable.get(variable_key)
+ variable_value = Variable.get(variable_key, team_name=team_name)
except KeyError:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
@@ -88,12 +91,14 @@ def get_variable(variable_key: str) -> VariableResponse:
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access
to the variable"},
},
)
-def put_variable(variable_key: str, body: VariablePostBody):
+def put_variable(
+ variable_key: str, body: VariablePostBody, team_name: Annotated[str |
None, Depends(get_team_name_dep)]
+):
"""Set an Airflow Variable."""
if not variable_key:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Not Found")
- Variable.set(key=variable_key, value=body.value,
description=body.description)
+ Variable.set(key=variable_key, value=body.value,
description=body.description, team_name=team_name)
return {"message": "Variable successfully set"}
@@ -105,9 +110,9 @@ def put_variable(variable_key: str, body: VariablePostBody):
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access
to the variable"},
},
)
-def delete_variable(variable_key: str):
+def delete_variable(variable_key: str, team_name: Annotated[str | None,
Depends(get_team_name_dep)]):
"""Delete an Airflow Variable."""
if not variable_key:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Not Found")
- Variable.delete(key=variable_key)
+ Variable.delete(key=variable_key, team_name=team_name)
diff --git a/airflow-core/src/airflow/cli/commands/team_command.py
b/airflow-core/src/airflow/cli/commands/team_command.py
index 6a187483bc3..c93013d177a 100644
--- a/airflow-core/src/airflow/cli/commands/team_command.py
+++ b/airflow-core/src/airflow/cli/commands/team_command.py
@@ -19,6 +19,8 @@
from __future__ import annotations
+import re
+
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError
@@ -50,6 +52,8 @@ def _extract_team_name(args):
team_name = args.name.strip()
if not team_name:
raise SystemExit("Team name cannot be empty")
+ if not re.match(r"^[a-zA-Z0-9_-]{3,50}$", team_name):
+ raise SystemExit("Invalid team name: must match regex
^[a-zA-Z0-9_-]{3,50}$")
return team_name
diff --git a/airflow-core/src/airflow/models/variable.py
b/airflow-core/src/airflow/models/variable.py
index a6e0ea5cf49..1ccee483470 100644
--- a/airflow-core/src/airflow/models/variable.py
+++ b/airflow-core/src/airflow/models/variable.py
@@ -24,7 +24,7 @@ import sys
import warnings
from typing import TYPE_CHECKING, Any
-from sqlalchemy import Boolean, ForeignKey, Integer, String, Text, delete,
select
+from sqlalchemy import Boolean, ForeignKey, Integer, String, Text, delete,
or_, select
from sqlalchemy.dialects.mysql import MEDIUMTEXT
from sqlalchemy.orm import Mapped, declared_attr, reconstructor, synonym
@@ -139,6 +139,7 @@ class Variable(Base, LoggingMixin):
key: str,
default_var: Any = __NO_DEFAULT_SENTINEL,
deserialize_json: bool = False,
+ team_name: str | None = None,
) -> Any:
"""
Get a value for an Airflow Variable Key.
@@ -146,6 +147,7 @@ class Variable(Base, LoggingMixin):
:param key: Variable Key
:param default_var: Default value of the Variable if the Variable
doesn't exist
:param deserialize_json: Deserialize the value to a Python dict
+ :param team_name: Team name associated to the task trying to access
the variable (if any)
"""
# TODO: This is not the best way of having compat, but it's "better
than erroring" for now. This still
# means SQLA etc is loaded, but we can't avoid that unless/until we
add import shims as a big
@@ -169,7 +171,12 @@ class Variable(Base, LoggingMixin):
return var_val
- var_val = Variable.get_variable_from_secrets(key=key)
+ if team_name and not conf.getboolean("core", "multi_team"):
+ raise ValueError(
+ "Multi-team mode is not configured in the Airflow environment
but the task trying to access the variable belongs to a team"
+ )
+
+ var_val = Variable.get_variable_from_secrets(key=key,
team_name=team_name)
if var_val is None:
if default_var is not cls.__NO_DEFAULT_SENTINEL:
return default_var
@@ -319,6 +326,7 @@ class Variable(Base, LoggingMixin):
key: str,
value: Any,
serialize_json: bool = False,
+ team_name: str | None = None,
session: Session | None = None,
) -> None:
"""
@@ -327,6 +335,7 @@ class Variable(Base, LoggingMixin):
:param key: Variable Key
:param value: Value to set for the Variable
:param serialize_json: Serialize the value to a JSON string
+ :param team_name: Team name associated to the variable (if any)
:param session: optional session, use if provided or create a new one
"""
# TODO: This is not the best way of having compat, but it's "better
than erroring" for now. This still
@@ -352,9 +361,14 @@ class Variable(Base, LoggingMixin):
)
return
+ if team_name and not conf.getboolean("core", "multi_team"):
+ raise ValueError(
+ "Multi-team mode is not configured in the Airflow environment.
To assign a team to a variable, multi-mode must be enabled."
+ )
+
Variable.check_for_write_conflict(key=key)
- if Variable.get_variable_from_secrets(key=key) is None:
+ if Variable.get_variable_from_secrets(key=key, team_name=team_name) is
None:
raise KeyError(f"Variable {key} does not exist")
ctx: contextlib.AbstractContextManager
@@ -364,7 +378,11 @@ class Variable(Base, LoggingMixin):
ctx = create_session()
with ctx as session:
- obj = session.scalar(select(Variable).where(Variable.key == key))
+ obj = session.scalar(
+ select(Variable).where(
+ Variable.key == key, or_(Variable.team_name == team_name,
Variable.team_name.is_(None))
+ )
+ )
if obj is None:
raise AttributeError(f"Variable {key} does not exist in the
Database and cannot be updated.")
@@ -377,11 +395,12 @@ class Variable(Base, LoggingMixin):
)
@staticmethod
- def delete(key: str, session: Session | None = None) -> int:
+ def delete(key: str, team_name: str | None = None, session: Session | None
= None) -> int:
"""
Delete an Airflow Variable for a given key.
:param key: Variable Keys
+ :param team_name: Team name associated to the task trying to delete
the variable (if any)
:param session: optional session, use if provided or create a new one
"""
# TODO: This is not the best way of having compat, but it's "better
than erroring" for now. This still
@@ -404,6 +423,11 @@ class Variable(Base, LoggingMixin):
)
return 1
+ if team_name and not conf.getboolean("core", "multi_team"):
+ raise ValueError(
+ "Multi-team mode is not configured in the Airflow environment
but the task trying to delete the variable belongs to a team"
+ )
+
ctx: contextlib.AbstractContextManager
if session is not None:
ctx = contextlib.nullcontext(session)
@@ -411,7 +435,11 @@ class Variable(Base, LoggingMixin):
ctx = create_session()
with ctx as session:
- result = session.execute(delete(Variable).where(Variable.key ==
key))
+ result = session.execute(
+ delete(Variable).where(
+ Variable.key == key, or_(Variable.team_name == team_name,
Variable.team_name.is_(None))
+ )
+ )
rows = getattr(result, "rowcount", 0) or 0
SecretCache.invalidate_variable(key)
return rows
@@ -458,25 +486,28 @@ class Variable(Base, LoggingMixin):
return None
@staticmethod
- def get_variable_from_secrets(key: str) -> str | None:
+ def get_variable_from_secrets(key: str, team_name: str | None = None) ->
str | None:
"""
Get Airflow Variable by iterating over all Secret Backends.
:param key: Variable Key
+ :param team_name: Team name associated to the task trying to access
the variable (if any)
:return: Variable Value
"""
- # check cache first
- # enabled only if SecretCache.init() has been called first
- try:
- return SecretCache.get_variable(key)
- except SecretCache.NotPresentException:
- pass # continue business
+ # Disable cache if the variable belongs to a team. We might enable it
later
+ if not team_name:
+ # check cache first
+ # enabled only if SecretCache.init() has been called first
+ try:
+ return SecretCache.get_variable(key)
+ except SecretCache.NotPresentException:
+ pass # continue business
var_val = None
# iterate over backends if not in cache (or expired)
for secrets_backend in ensure_secrets_loaded():
try:
- var_val = secrets_backend.get_variable(key=key)
+ var_val = secrets_backend.get_variable(key=key,
team_name=team_name)
if var_val is not None:
break
except Exception:
diff --git a/airflow-core/src/airflow/secrets/environment_variables.py
b/airflow-core/src/airflow/secrets/environment_variables.py
index e6bd72d4e5a..1973152b426 100644
--- a/airflow-core/src/airflow/secrets/environment_variables.py
+++ b/airflow-core/src/airflow/secrets/environment_variables.py
@@ -33,11 +33,18 @@ class EnvironmentVariablesBackend(BaseSecretsBackend):
def get_conn_value(self, conn_id: str) -> str | None:
return os.environ.get(CONN_ENV_PREFIX + conn_id.upper())
- def get_variable(self, key: str) -> str | None:
+ def get_variable(self, key: str, team_name: str | None = None) -> str |
None:
"""
Get Airflow Variable from Environment Variable.
:param key: Variable Key
+ :param team_name: Team name associated to the task trying to access
the variable (if any)
:return: Variable Value
"""
+ if team_name and (
+ team_var :=
os.environ.get(f"{VAR_ENV_PREFIX}_{team_name.upper()}___" + key.upper())
+ ):
+ # Format to set a team specific variable:
AIRFLOW_VAR__<TEAM_ID>___<VAR_KEY>
+ return team_var
+
return os.environ.get(VAR_ENV_PREFIX + key.upper())
diff --git a/airflow-core/src/airflow/secrets/local_filesystem.py
b/airflow-core/src/airflow/secrets/local_filesystem.py
index efcd3b69b0c..a6780b609dc 100644
--- a/airflow-core/src/airflow/secrets/local_filesystem.py
+++ b/airflow-core/src/airflow/secrets/local_filesystem.py
@@ -349,7 +349,7 @@ class LocalFilesystemBackend(BaseSecretsBackend,
LoggingMixin):
return self._local_connections[conn_id]
return None
- def get_variable(self, key: str) -> str | None:
+ def get_variable(self, key: str, team_name: str | None = None) -> str |
None:
return self._local_variables.get(key)
def get_config(self, key: str) -> str | None:
diff --git a/airflow-core/src/airflow/secrets/metastore.py
b/airflow-core/src/airflow/secrets/metastore.py
index 21a26104dde..4ba3082ac31 100644
--- a/airflow-core/src/airflow/secrets/metastore.py
+++ b/airflow-core/src/airflow/secrets/metastore.py
@@ -21,7 +21,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING
-from sqlalchemy import select
+from sqlalchemy import or_, select
from airflow.secrets import BaseSecretsBackend
from airflow.utils.session import NEW_SESSION, provide_session
@@ -51,17 +51,24 @@ class MetastoreBackend(BaseSecretsBackend):
return conn
@provide_session
- def get_variable(self, key: str, session: Session = NEW_SESSION) -> str |
None:
+ def get_variable(
+ self, key: str, team_name: str | None = None, session: Session =
NEW_SESSION
+ ) -> str | None:
"""
Get Airflow Variable from Metadata DB.
:param key: Variable Key
+ :param team_name: Team name associated to the task trying to access
the variable (if any)
:param session: SQLAlchemy Session
:return: Variable Value
"""
from airflow.models import Variable
- var_value = session.scalar(select(Variable).where(Variable.key ==
key).limit(1))
+ var_value = session.scalar(
+ select(Variable)
+ .where(Variable.key == key, or_(Variable.team_name == team_name,
Variable.team_name.is_(None)))
+ .limit(1)
+ )
session.expunge_all()
if var_value:
return var_value.val
diff --git a/airflow-core/tests/unit/always/test_secrets.py
b/airflow-core/tests/unit/always/test_secrets.py
index aa02573e7d0..7ef36733517 100644
--- a/airflow-core/tests/unit/always/test_secrets.py
+++ b/airflow-core/tests/unit/always/test_secrets.py
@@ -142,8 +142,8 @@ class TestVariableFromSecrets:
Variable.get_variable_from_secrets("fake_var_key")
- mock_meta_get.assert_called_once_with(key="fake_var_key")
- mock_env_get.assert_called_once_with(key="fake_var_key")
+ mock_meta_get.assert_called_once_with(key="fake_var_key",
team_name=None)
+ mock_env_get.assert_called_once_with(key="fake_var_key",
team_name=None)
@mock.patch("airflow.secrets.metastore.MetastoreBackend.get_variable")
@mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable")
@@ -154,7 +154,7 @@ class TestVariableFromSecrets:
"""
mock_env_get.return_value = "something"
Variable.get_variable_from_secrets("fake_var_key")
- mock_env_get.assert_called_once_with(key="fake_var_key")
+ mock_env_get.assert_called_once_with(key="fake_var_key",
team_name=None)
mock_meta_get.assert_not_called()
def test_backend_fallback_to_default_var(self):
@@ -194,13 +194,13 @@ class TestVariableFromSecrets:
mock_meta_get.return_value = None
assert Variable.get(key="MYVAR") == "a_venv_value"
- mock_secret_get.assert_called_with(key="MYVAR")
+ mock_secret_get.assert_called_with(key="MYVAR", team_name=None)
mock_meta_get.assert_not_called()
mock_secret_get.return_value = None
mock_meta_get.return_value = "a_metastore_value"
assert Variable.get(key="not_myvar") == "a_metastore_value"
- mock_meta_get.assert_called_once_with(key="not_myvar")
+ mock_meta_get.assert_called_once_with(key="not_myvar", team_name=None)
mock_secret_get.return_value = "a_secret_value"
assert Variable.get(key="not_myvar") == "a_secret_value"
diff --git a/airflow-core/tests/unit/cli/commands/test_team_command.py
b/airflow-core/tests/unit/cli/commands/test_team_command.py
index c9fe75be02d..046e58de188 100644
--- a/airflow-core/tests/unit/cli/commands/test_team_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_team_command.py
@@ -81,6 +81,10 @@ class TestCliTeams:
with pytest.raises(SystemExit, match="Team name cannot be empty"):
team_command.team_create(self.parser.parse_args(["teams",
"create", ""]))
+ def test_team_create_invalid_name(self):
+ with pytest.raises(SystemExit, match="Invalid team name"):
+ team_command.team_create(self.parser.parse_args(["teams",
"create", "test with space"]))
+
def test_team_create_whitespace_name(self):
"""Test team creation with whitespace-only name."""
with pytest.raises(SystemExit, match="Team name cannot be empty"):
diff --git a/airflow-core/tests/unit/models/test_variable.py
b/airflow-core/tests/unit/models/test_variable.py
index 12d63c1ab77..3e2e3cce1f3 100644
--- a/airflow-core/tests/unit/models/test_variable.py
+++ b/airflow-core/tests/unit/models/test_variable.py
@@ -104,6 +104,24 @@ class TestVariable:
assert test_var.val == test_value
assert Fernet(key2).decrypt(test_var._val.encode()) ==
test_value.encode()
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_get_variable_with_team(self, testing_team, session):
+ Variable.set(key="key", value="value", team_name=testing_team.name,
session=session)
+ result = Variable.get(key="key", team_name=testing_team.name)
+ assert result == "value"
+
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_get_global_variable_with_team(self, testing_team, session):
+ Variable.set(key="key", value="value", session=session)
+ result = Variable.get(key="key", team_name=testing_team.name)
+ assert result == "value"
+
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_get_team_variable_without_team(self, testing_team, session):
+ Variable.set(key="key", value="value", team_name=testing_team.name,
session=session)
+ with pytest.raises(KeyError):
+ Variable.get(key="key")
+
def test_variable_set_get_round_trip(self):
Variable.set("tested_var_set_id", "Monday morning breakfast")
assert Variable.get("tested_var_set_id") == "Monday morning breakfast"
@@ -192,6 +210,24 @@ class TestVariable:
assert test_var.val == "value2"
assert test_var.description == "a test variable"
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_variable_update_with_team(self, testing_team, session):
+ Variable.set(key="test_key", value="value1",
team_name=testing_team.name, session=session)
+ Variable.update(key="test_key", value="value2",
team_name=testing_team.name, session=session)
+ assert Variable.get("test_key", team_name=testing_team.name) ==
"value2"
+
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_variable_update_with_team_global(self, testing_team, session):
+ Variable.set(key="test_key", value="value1", session=session)
+ Variable.update(key="test_key", value="value2",
team_name=testing_team.name, session=session)
+ assert Variable.get("test_key", team_name=testing_team.name) ==
"value2"
+
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_variable_update_with_wrong_team(self, testing_team, session):
+ Variable.set(key="test_key", value="value1",
team_name=testing_team.name, session=session)
+ with pytest.raises(KeyError):
+ Variable.update(key="test_key", value="value2", session=session)
+
def test_set_variable_sets_description(self, session):
Variable.set(key="key", value="value", description="a test variable",
session=session)
test_var = session.query(Variable).filter(Variable.key == "key").one()
@@ -276,6 +312,32 @@ class TestVariable:
with pytest.raises(KeyError):
Variable.get(key)
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_variable_delete_with_team(self, testing_team, session):
+ key = "tested_var_delete"
+ value = "to be deleted"
+
+ # No-op if the variable doesn't exist
+ Variable.delete(key=key, team_name=testing_team.name, session=session)
+ with pytest.raises(KeyError):
+ Variable.get(key)
+
+ # Delete same team variable
+ Variable.set(key=key, value=value, team_name=testing_team.name,
session=session)
+ Variable.delete(key=key, team_name=testing_team.name, session=session)
+ with pytest.raises(KeyError):
+ Variable.get(key)
+
+ # Delete global variable
+ Variable.set(key=key, value=value, session=session)
+ Variable.delete(key=key, team_name=testing_team.name, session=session)
+ with pytest.raises(KeyError):
+ Variable.get(key)
+
+ # Attempt to delete a team variable from another one
+ Variable.set(key=key, value=value, team_name=testing_team.name,
session=session)
+ assert Variable.delete(key=key, session=session) == 0
+
def test_masking_from_db(self, session):
"""Test secrets are masked when loaded directly from the DB"""
# Normally people will use `Variable.get`, but just in case, catch
direct DB access too
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py
b/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py
index 47f99afd787..0d8e80ae4fb 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py
@@ -225,11 +225,12 @@ class SecretsManagerBackend(BaseSecretsBackend,
LoggingMixin):
return standardized_secret
return secret
- def get_variable(self, key: str) -> str | None:
+ def get_variable(self, key: str, team_name: str | None = None) -> str |
None:
"""
Get Airflow Variable.
:param key: Variable Key
+ :param team_name: Team name associated to the task trying to access
the variable (if any)
:return: Variable Value
"""
if self.variables_prefix is None:
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/secrets/systems_manager.py
b/providers/amazon/src/airflow/providers/amazon/aws/secrets/systems_manager.py
index 25fb1606f2b..2a1228116da 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/secrets/systems_manager.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/secrets/systems_manager.py
@@ -143,11 +143,12 @@ class
SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
return self._get_secret(self.connections_prefix, conn_id,
self.connections_lookup_pattern)
- def get_variable(self, key: str) -> str | None:
+ def get_variable(self, key: str, team_name: str | None = None) -> str |
None:
"""
Get Airflow Variable.
:param key: Variable Key
+ :param team_name: Team name associated to the task trying to access
the variable (if any)
:return: Variable Value
"""
if self.variables_prefix is None:
diff --git
a/providers/google/src/airflow/providers/google/cloud/secrets/secret_manager.py
b/providers/google/src/airflow/providers/google/cloud/secrets/secret_manager.py
index c8cdfc6c53b..428f31d064c 100644
---
a/providers/google/src/airflow/providers/google/cloud/secrets/secret_manager.py
+++
b/providers/google/src/airflow/providers/google/cloud/secrets/secret_manager.py
@@ -160,11 +160,12 @@ class CloudSecretManagerBackend(BaseSecretsBackend,
LoggingMixin):
return self._get_secret(self.connections_prefix, conn_id)
- def get_variable(self, key: str) -> str | None:
+ def get_variable(self, key: str, team_name: str | None = None) -> str |
None:
"""
Get Airflow Variable from Environment Variable.
:param key: Variable Key
+ :param team_name: Team name associated to the task trying to access
the variable (if any)
:return: Variable Value
"""
if self.variables_prefix is None:
diff --git
a/providers/hashicorp/src/airflow/providers/hashicorp/secrets/vault.py
b/providers/hashicorp/src/airflow/providers/hashicorp/secrets/vault.py
index 91dcea39e33..627feb137c4 100644
--- a/providers/hashicorp/src/airflow/providers/hashicorp/secrets/vault.py
+++ b/providers/hashicorp/src/airflow/providers/hashicorp/secrets/vault.py
@@ -208,11 +208,12 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
return Connection(conn_id, **response)
- def get_variable(self, key: str) -> str | None:
+ def get_variable(self, key: str, team_name: str | None = None) -> str |
None:
"""
Get Airflow Variable.
:param key: Variable Key
+ :param team_name: Team name associated to the task trying to access
the variable (if any)
:return: Variable Value retrieved from the vault
"""
mount_point, variable_key = self._parse_path(key)
diff --git
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/secrets/key_vault.py
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/secrets/key_vault.py
index 423e5bc247c..64590b6bae6 100644
---
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/secrets/key_vault.py
+++
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/secrets/key_vault.py
@@ -155,11 +155,12 @@ class AzureKeyVaultBackend(BaseSecretsBackend,
LoggingMixin):
return self._get_secret(self.connections_prefix, conn_id)
- def get_variable(self, key: str) -> str | None:
+ def get_variable(self, key: str, team_name: str | None = None) -> str |
None:
"""
Get an Airflow Variable from an Azure Key Vault secret.
:param key: Variable Key
+ :param team_name: Team name associated to the task trying to access
the variable (if any)
:return: Variable Value
"""
if self.variables_prefix is None:
diff --git a/providers/yandex/src/airflow/providers/yandex/secrets/lockbox.py
b/providers/yandex/src/airflow/providers/yandex/secrets/lockbox.py
index d65131ab2cb..9cbf7321294 100644
--- a/providers/yandex/src/airflow/providers/yandex/secrets/lockbox.py
+++ b/providers/yandex/src/airflow/providers/yandex/secrets/lockbox.py
@@ -160,11 +160,12 @@ class LockboxSecretBackend(BaseSecretsBackend,
LoggingMixin):
return self._get_secret_value(self.connections_prefix, conn_id)
- def get_variable(self, key: str) -> str | None:
+ def get_variable(self, key: str, team_name: str | None = None) -> str |
None:
"""
Return value for Airflow Variable.
:param key: Variable Key
+ :param team_name: Team name associated to the task trying to access
the variable (if any)
:return: Variable Value
"""
if self.variables_prefix is None:
diff --git a/shared/secrets_backend/src/airflow_shared/secrets_backend/base.py
b/shared/secrets_backend/src/airflow_shared/secrets_backend/base.py
index 0ae361ffd32..c75e274c598 100644
--- a/shared/secrets_backend/src/airflow_shared/secrets_backend/base.py
+++ b/shared/secrets_backend/src/airflow_shared/secrets_backend/base.py
@@ -44,11 +44,12 @@ class BaseSecretsBackend(ABC):
"""
raise NotImplementedError
- def get_variable(self, key: str) -> str | None:
+ def get_variable(self, key: str, team_name: str | None = None) -> str |
None:
"""
Return value for Airflow Variable.
:param key: Variable Key
+ :param team_name: Team name associated to the task trying to access
the variable (if any)
:return: Variable Value
"""
raise NotImplementedError()
diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
index f9bee875f83..18f80f29dcf 100644
--- a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
+++ b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
@@ -88,11 +88,13 @@ class ExecutionAPISecretsBackend(BaseSecretsBackend):
# to allow fallback to other backends
return None
- def get_variable(self, key: str) -> str | None:
+ def get_variable(self, key: str, team_name: str | None = None) -> str |
None:
"""
Return variable value by routing through SUPERVISOR_COMMS.
:param key: Variable key
+ :param team_id: ID of the team associated to the task trying to access
the variable.
+ Unused here because the team ID is inferred from the task ID
provided in the execution API JWT token.
:return: Variable value or None if not found
"""
from airflow.sdk.execution_time.comms import ErrorResponse,
GetVariable, VariableResult