This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d32091cd4f4 Check team boundaries in variables (#58905)
d32091cd4f4 is described below

commit d32091cd4f4b2b0935bc71e02e2995cdb8713a30
Author: Vincent <[email protected]>
AuthorDate: Wed Dec 10 13:52:01 2025 -0500

    Check team boundaries in variables (#58905)
---
 .../core_api/routes/public/variables.py            |  8 ++-
 .../src/airflow/api_fastapi/execution_api/deps.py  | 22 ++++++++
 .../api_fastapi/execution_api/routes/variables.py  | 19 ++++---
 .../src/airflow/cli/commands/team_command.py       |  4 ++
 airflow-core/src/airflow/models/variable.py        | 59 +++++++++++++++-----
 .../src/airflow/secrets/environment_variables.py   |  9 +++-
 .../src/airflow/secrets/local_filesystem.py        |  2 +-
 airflow-core/src/airflow/secrets/metastore.py      | 13 +++--
 airflow-core/tests/unit/always/test_secrets.py     | 10 ++--
 .../tests/unit/cli/commands/test_team_command.py   |  4 ++
 airflow-core/tests/unit/models/test_variable.py    | 62 ++++++++++++++++++++++
 .../amazon/aws/secrets/secrets_manager.py          |  3 +-
 .../amazon/aws/secrets/systems_manager.py          |  3 +-
 .../google/cloud/secrets/secret_manager.py         |  3 +-
 .../airflow/providers/hashicorp/secrets/vault.py   |  3 +-
 .../providers/microsoft/azure/secrets/key_vault.py |  3 +-
 .../airflow/providers/yandex/secrets/lockbox.py    |  3 +-
 .../src/airflow_shared/secrets_backend/base.py     |  3 +-
 .../sdk/execution_time/secrets/execution_api.py    |  4 +-
 19 files changed, 196 insertions(+), 41 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
index db78f0e542c..5f9bcaee4d3 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 from typing import Annotated
 
 from fastapi import Depends, HTTPException, Query, status
-from sqlalchemy import select
+from sqlalchemy import delete, select
 
 from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
 from airflow.api_fastapi.common.parameters import (
@@ -62,7 +62,11 @@ def delete_variable(
     session: SessionDep,
 ):
     """Delete a variable entry."""
-    if Variable.delete(variable_key, session) == 0:
+    # Like the other endpoints (get, patch), we do not use 
Variable.delete/get/set here because these methods
+    # are intended to be used in task execution environment (execution API)
+    result = session.execute(delete(Variable).where(Variable.key == 
variable_key))
+    rows = getattr(result, "rowcount", 0) or 0
+    if rows == 0:
         raise HTTPException(
             status.HTTP_404_NOT_FOUND, f"The Variable with key: 
`{variable_key}` was not found"
         )
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/deps.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/deps.py
index d247a31f5f4..fce188d48ed 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/deps.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/deps.py
@@ -24,9 +24,15 @@ import structlog
 import svcs
 from fastapi import Depends, HTTPException, Request, status
 from fastapi.security import HTTPBearer
+from sqlalchemy import select
 
 from airflow.api_fastapi.auth.tokens import JWTValidator
+from airflow.api_fastapi.common.db.common import AsyncSessionDep
 from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+from airflow.configuration import conf
+from airflow.models import DagModel, TaskInstance
+from airflow.models.dagbundle import DagBundleModel
+from airflow.models.team import Team
 
 log = structlog.get_logger(logger_name=__name__)
 
@@ -95,3 +101,19 @@ JWTBearerDep: TIToken = Depends(JWTBearer())
 
 # This checks that the UUID in the url matches the one in the token for us.
 JWTBearerTIPathDep = Depends(JWTBearer(path_param_name="task_instance_id"))
+
+
+async def get_team_name_dep(session: AsyncSessionDep, token=JWTBearerDep) -> 
str | None:
+    """Return the team name associated to the task (if any)."""
+    if not conf.getboolean("core", "multi_team"):
+        return None
+
+    stmt = (
+        select(Team.name)
+        .select_from(TaskInstance)
+        .join(DagModel, DagModel.dag_id == TaskInstance.dag_id)
+        .join(DagBundleModel, DagBundleModel.name == DagModel.bundle_name)
+        .join(DagBundleModel.teams)
+        .where(TaskInstance.id == str(token.id))
+    )
+    return await session.scalar(stmt)
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
index d2f5d21349c..5621b6cd081 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import logging
+from typing import Annotated
 
 from fastapi import APIRouter, Depends, HTTPException, Path, Request, status
 
@@ -25,7 +26,7 @@ from airflow.api_fastapi.execution_api.datamodels.variable 
import (
     VariablePostBody,
     VariableResponse,
 )
-from airflow.api_fastapi.execution_api.deps import JWTBearerDep
+from airflow.api_fastapi.execution_api.deps import JWTBearerDep, 
get_team_name_dep
 from airflow.models.variable import Variable
 
 
@@ -61,13 +62,15 @@ log = logging.getLogger(__name__)
         status.HTTP_403_FORBIDDEN: {"description": "Task does not have access 
to the variable"},
     },
 )
-def get_variable(variable_key: str) -> VariableResponse:
+def get_variable(
+    variable_key: str, team_name: Annotated[str | None, 
Depends(get_team_name_dep)]
+) -> VariableResponse:
     """Get an Airflow Variable."""
     if not variable_key:
         raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Not Found")
 
     try:
-        variable_value = Variable.get(variable_key)
+        variable_value = Variable.get(variable_key, team_name=team_name)
     except KeyError:
         raise HTTPException(
             status.HTTP_404_NOT_FOUND,
@@ -88,12 +91,14 @@ def get_variable(variable_key: str) -> VariableResponse:
         status.HTTP_403_FORBIDDEN: {"description": "Task does not have access 
to the variable"},
     },
 )
-def put_variable(variable_key: str, body: VariablePostBody):
+def put_variable(
+    variable_key: str, body: VariablePostBody, team_name: Annotated[str | 
None, Depends(get_team_name_dep)]
+):
     """Set an Airflow Variable."""
     if not variable_key:
         raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Not Found")
 
-    Variable.set(key=variable_key, value=body.value, 
description=body.description)
+    Variable.set(key=variable_key, value=body.value, 
description=body.description, team_name=team_name)
     return {"message": "Variable successfully set"}
 
 
@@ -105,9 +110,9 @@ def put_variable(variable_key: str, body: VariablePostBody):
         status.HTTP_403_FORBIDDEN: {"description": "Task does not have access 
to the variable"},
     },
 )
-def delete_variable(variable_key: str):
+def delete_variable(variable_key: str, team_name: Annotated[str | None, 
Depends(get_team_name_dep)]):
     """Delete an Airflow Variable."""
     if not variable_key:
         raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Not Found")
 
-    Variable.delete(key=variable_key)
+    Variable.delete(key=variable_key, team_name=team_name)
diff --git a/airflow-core/src/airflow/cli/commands/team_command.py 
b/airflow-core/src/airflow/cli/commands/team_command.py
index 6a187483bc3..c93013d177a 100644
--- a/airflow-core/src/airflow/cli/commands/team_command.py
+++ b/airflow-core/src/airflow/cli/commands/team_command.py
@@ -19,6 +19,8 @@
 
 from __future__ import annotations
 
+import re
+
 from sqlalchemy import func, select
 from sqlalchemy.exc import IntegrityError
 
@@ -50,6 +52,8 @@ def _extract_team_name(args):
     team_name = args.name.strip()
     if not team_name:
         raise SystemExit("Team name cannot be empty")
+    if not re.match(r"^[a-zA-Z0-9_-]{3,50}$", team_name):
+        raise SystemExit("Invalid team name: must match regex 
^[a-zA-Z0-9_-]{3,50}$")
     return team_name
 
 
diff --git a/airflow-core/src/airflow/models/variable.py 
b/airflow-core/src/airflow/models/variable.py
index a6e0ea5cf49..1ccee483470 100644
--- a/airflow-core/src/airflow/models/variable.py
+++ b/airflow-core/src/airflow/models/variable.py
@@ -24,7 +24,7 @@ import sys
 import warnings
 from typing import TYPE_CHECKING, Any
 
-from sqlalchemy import Boolean, ForeignKey, Integer, String, Text, delete, 
select
+from sqlalchemy import Boolean, ForeignKey, Integer, String, Text, delete, 
or_, select
 from sqlalchemy.dialects.mysql import MEDIUMTEXT
 from sqlalchemy.orm import Mapped, declared_attr, reconstructor, synonym
 
@@ -139,6 +139,7 @@ class Variable(Base, LoggingMixin):
         key: str,
         default_var: Any = __NO_DEFAULT_SENTINEL,
         deserialize_json: bool = False,
+        team_name: str | None = None,
     ) -> Any:
         """
         Get a value for an Airflow Variable Key.
@@ -146,6 +147,7 @@ class Variable(Base, LoggingMixin):
         :param key: Variable Key
         :param default_var: Default value of the Variable if the Variable 
doesn't exist
         :param deserialize_json: Deserialize the value to a Python dict
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
         """
         # TODO: This is not the best way of having compat, but it's "better 
than erroring" for now. This still
         # means SQLA etc is loaded, but we can't avoid that unless/until we 
add import shims as a big
@@ -169,7 +171,12 @@ class Variable(Base, LoggingMixin):
 
             return var_val
 
-        var_val = Variable.get_variable_from_secrets(key=key)
+        if team_name and not conf.getboolean("core", "multi_team"):
+            raise ValueError(
+                "Multi-team mode is not configured in the Airflow environment 
but the task trying to access the variable belongs to a team"
+            )
+
+        var_val = Variable.get_variable_from_secrets(key=key, 
team_name=team_name)
         if var_val is None:
             if default_var is not cls.__NO_DEFAULT_SENTINEL:
                 return default_var
@@ -319,6 +326,7 @@ class Variable(Base, LoggingMixin):
         key: str,
         value: Any,
         serialize_json: bool = False,
+        team_name: str | None = None,
         session: Session | None = None,
     ) -> None:
         """
@@ -327,6 +335,7 @@ class Variable(Base, LoggingMixin):
         :param key: Variable Key
         :param value: Value to set for the Variable
         :param serialize_json: Serialize the value to a JSON string
+        :param team_name: Team name associated to the variable (if any)
         :param session: optional session, use if provided or create a new one
         """
         # TODO: This is not the best way of having compat, but it's "better 
than erroring" for now. This still
@@ -352,9 +361,14 @@ class Variable(Base, LoggingMixin):
             )
             return
 
+        if team_name and not conf.getboolean("core", "multi_team"):
+            raise ValueError(
+                "Multi-team mode is not configured in the Airflow environment. 
To assign a team to a variable, multi-mode must be enabled."
+            )
+
         Variable.check_for_write_conflict(key=key)
 
-        if Variable.get_variable_from_secrets(key=key) is None:
+        if Variable.get_variable_from_secrets(key=key, team_name=team_name) is 
None:
             raise KeyError(f"Variable {key} does not exist")
 
         ctx: contextlib.AbstractContextManager
@@ -364,7 +378,11 @@ class Variable(Base, LoggingMixin):
             ctx = create_session()
 
         with ctx as session:
-            obj = session.scalar(select(Variable).where(Variable.key == key))
+            obj = session.scalar(
+                select(Variable).where(
+                    Variable.key == key, or_(Variable.team_name == team_name, 
Variable.team_name.is_(None))
+                )
+            )
             if obj is None:
                 raise AttributeError(f"Variable {key} does not exist in the 
Database and cannot be updated.")
 
@@ -377,11 +395,12 @@ class Variable(Base, LoggingMixin):
             )
 
     @staticmethod
-    def delete(key: str, session: Session | None = None) -> int:
+    def delete(key: str, team_name: str | None = None, session: Session | None 
= None) -> int:
         """
         Delete an Airflow Variable for a given key.
 
         :param key: Variable Keys
+        :param team_name: Team name associated to the task trying to delete 
the variable (if any)
         :param session: optional session, use if provided or create a new one
         """
         # TODO: This is not the best way of having compat, but it's "better 
than erroring" for now. This still
@@ -404,6 +423,11 @@ class Variable(Base, LoggingMixin):
             )
             return 1
 
+        if team_name and not conf.getboolean("core", "multi_team"):
+            raise ValueError(
+                "Multi-team mode is not configured in the Airflow environment 
but the task trying to delete the variable belongs to a team"
+            )
+
         ctx: contextlib.AbstractContextManager
         if session is not None:
             ctx = contextlib.nullcontext(session)
@@ -411,7 +435,11 @@ class Variable(Base, LoggingMixin):
             ctx = create_session()
 
         with ctx as session:
-            result = session.execute(delete(Variable).where(Variable.key == 
key))
+            result = session.execute(
+                delete(Variable).where(
+                    Variable.key == key, or_(Variable.team_name == team_name, 
Variable.team_name.is_(None))
+                )
+            )
             rows = getattr(result, "rowcount", 0) or 0
             SecretCache.invalidate_variable(key)
             return rows
@@ -458,25 +486,28 @@ class Variable(Base, LoggingMixin):
             return None
 
     @staticmethod
-    def get_variable_from_secrets(key: str) -> str | None:
+    def get_variable_from_secrets(key: str, team_name: str | None = None) -> 
str | None:
         """
         Get Airflow Variable by iterating over all Secret Backends.
 
         :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
         :return: Variable Value
         """
-        # check cache first
-        # enabled only if SecretCache.init() has been called first
-        try:
-            return SecretCache.get_variable(key)
-        except SecretCache.NotPresentException:
-            pass  # continue business
+        # Disable cache if the variable belongs to a team. We might enable it 
later
+        if not team_name:
+            # check cache first
+            # enabled only if SecretCache.init() has been called first
+            try:
+                return SecretCache.get_variable(key)
+            except SecretCache.NotPresentException:
+                pass  # continue business
 
         var_val = None
         # iterate over backends if not in cache (or expired)
         for secrets_backend in ensure_secrets_loaded():
             try:
-                var_val = secrets_backend.get_variable(key=key)
+                var_val = secrets_backend.get_variable(key=key, 
team_name=team_name)
                 if var_val is not None:
                     break
             except Exception:
diff --git a/airflow-core/src/airflow/secrets/environment_variables.py 
b/airflow-core/src/airflow/secrets/environment_variables.py
index e6bd72d4e5a..1973152b426 100644
--- a/airflow-core/src/airflow/secrets/environment_variables.py
+++ b/airflow-core/src/airflow/secrets/environment_variables.py
@@ -33,11 +33,18 @@ class EnvironmentVariablesBackend(BaseSecretsBackend):
     def get_conn_value(self, conn_id: str) -> str | None:
         return os.environ.get(CONN_ENV_PREFIX + conn_id.upper())
 
-    def get_variable(self, key: str) -> str | None:
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
         """
         Get Airflow Variable from Environment Variable.
 
         :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
         :return: Variable Value
         """
+        if team_name and (
+            team_var := 
os.environ.get(f"{VAR_ENV_PREFIX}_{team_name.upper()}___" + key.upper())
+        ):
+            # Format to set a team specific variable: 
AIRFLOW_VAR__<TEAM_ID>___<VAR_KEY>
+            return team_var
+
         return os.environ.get(VAR_ENV_PREFIX + key.upper())
diff --git a/airflow-core/src/airflow/secrets/local_filesystem.py 
b/airflow-core/src/airflow/secrets/local_filesystem.py
index efcd3b69b0c..a6780b609dc 100644
--- a/airflow-core/src/airflow/secrets/local_filesystem.py
+++ b/airflow-core/src/airflow/secrets/local_filesystem.py
@@ -349,7 +349,7 @@ class LocalFilesystemBackend(BaseSecretsBackend, 
LoggingMixin):
             return self._local_connections[conn_id]
         return None
 
-    def get_variable(self, key: str) -> str | None:
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
         return self._local_variables.get(key)
 
     def get_config(self, key: str) -> str | None:
diff --git a/airflow-core/src/airflow/secrets/metastore.py 
b/airflow-core/src/airflow/secrets/metastore.py
index 21a26104dde..4ba3082ac31 100644
--- a/airflow-core/src/airflow/secrets/metastore.py
+++ b/airflow-core/src/airflow/secrets/metastore.py
@@ -21,7 +21,7 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING
 
-from sqlalchemy import select
+from sqlalchemy import or_, select
 
 from airflow.secrets import BaseSecretsBackend
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -51,17 +51,24 @@ class MetastoreBackend(BaseSecretsBackend):
         return conn
 
     @provide_session
-    def get_variable(self, key: str, session: Session = NEW_SESSION) -> str | 
None:
+    def get_variable(
+        self, key: str, team_name: str | None = None, session: Session = 
NEW_SESSION
+    ) -> str | None:
         """
         Get Airflow Variable from Metadata DB.
 
         :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
         :param session: SQLAlchemy Session
         :return: Variable Value
         """
         from airflow.models import Variable
 
-        var_value = session.scalar(select(Variable).where(Variable.key == 
key).limit(1))
+        var_value = session.scalar(
+            select(Variable)
+            .where(Variable.key == key, or_(Variable.team_name == team_name, 
Variable.team_name.is_(None)))
+            .limit(1)
+        )
         session.expunge_all()
         if var_value:
             return var_value.val
diff --git a/airflow-core/tests/unit/always/test_secrets.py 
b/airflow-core/tests/unit/always/test_secrets.py
index aa02573e7d0..7ef36733517 100644
--- a/airflow-core/tests/unit/always/test_secrets.py
+++ b/airflow-core/tests/unit/always/test_secrets.py
@@ -142,8 +142,8 @@ class TestVariableFromSecrets:
 
         Variable.get_variable_from_secrets("fake_var_key")
 
-        mock_meta_get.assert_called_once_with(key="fake_var_key")
-        mock_env_get.assert_called_once_with(key="fake_var_key")
+        mock_meta_get.assert_called_once_with(key="fake_var_key", 
team_name=None)
+        mock_env_get.assert_called_once_with(key="fake_var_key", 
team_name=None)
 
     @mock.patch("airflow.secrets.metastore.MetastoreBackend.get_variable")
     
@mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable")
@@ -154,7 +154,7 @@ class TestVariableFromSecrets:
         """
         mock_env_get.return_value = "something"
         Variable.get_variable_from_secrets("fake_var_key")
-        mock_env_get.assert_called_once_with(key="fake_var_key")
+        mock_env_get.assert_called_once_with(key="fake_var_key", 
team_name=None)
         mock_meta_get.assert_not_called()
 
     def test_backend_fallback_to_default_var(self):
@@ -194,13 +194,13 @@ class TestVariableFromSecrets:
         mock_meta_get.return_value = None
 
         assert Variable.get(key="MYVAR") == "a_venv_value"
-        mock_secret_get.assert_called_with(key="MYVAR")
+        mock_secret_get.assert_called_with(key="MYVAR", team_name=None)
         mock_meta_get.assert_not_called()
 
         mock_secret_get.return_value = None
         mock_meta_get.return_value = "a_metastore_value"
         assert Variable.get(key="not_myvar") == "a_metastore_value"
-        mock_meta_get.assert_called_once_with(key="not_myvar")
+        mock_meta_get.assert_called_once_with(key="not_myvar", team_name=None)
 
         mock_secret_get.return_value = "a_secret_value"
         assert Variable.get(key="not_myvar") == "a_secret_value"
diff --git a/airflow-core/tests/unit/cli/commands/test_team_command.py 
b/airflow-core/tests/unit/cli/commands/test_team_command.py
index c9fe75be02d..046e58de188 100644
--- a/airflow-core/tests/unit/cli/commands/test_team_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_team_command.py
@@ -81,6 +81,10 @@ class TestCliTeams:
         with pytest.raises(SystemExit, match="Team name cannot be empty"):
             team_command.team_create(self.parser.parse_args(["teams", 
"create", ""]))
 
+    def test_team_create_invalid_name(self):
+        with pytest.raises(SystemExit, match="Invalid team name"):
+            team_command.team_create(self.parser.parse_args(["teams", 
"create", "test with space"]))
+
     def test_team_create_whitespace_name(self):
         """Test team creation with whitespace-only name."""
         with pytest.raises(SystemExit, match="Team name cannot be empty"):
diff --git a/airflow-core/tests/unit/models/test_variable.py 
b/airflow-core/tests/unit/models/test_variable.py
index 12d63c1ab77..3e2e3cce1f3 100644
--- a/airflow-core/tests/unit/models/test_variable.py
+++ b/airflow-core/tests/unit/models/test_variable.py
@@ -104,6 +104,24 @@ class TestVariable:
             assert test_var.val == test_value
             assert Fernet(key2).decrypt(test_var._val.encode()) == 
test_value.encode()
 
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_get_variable_with_team(self, testing_team, session):
+        Variable.set(key="key", value="value", team_name=testing_team.name, 
session=session)
+        result = Variable.get(key="key", team_name=testing_team.name)
+        assert result == "value"
+
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_get_global_variable_with_team(self, testing_team, session):
+        Variable.set(key="key", value="value", session=session)
+        result = Variable.get(key="key", team_name=testing_team.name)
+        assert result == "value"
+
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_get_team_variable_without_team(self, testing_team, session):
+        Variable.set(key="key", value="value", team_name=testing_team.name, 
session=session)
+        with pytest.raises(KeyError):
+            Variable.get(key="key")
+
     def test_variable_set_get_round_trip(self):
         Variable.set("tested_var_set_id", "Monday morning breakfast")
         assert Variable.get("tested_var_set_id") == "Monday morning breakfast"
@@ -192,6 +210,24 @@ class TestVariable:
         assert test_var.val == "value2"
         assert test_var.description == "a test variable"
 
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_variable_update_with_team(self, testing_team, session):
+        Variable.set(key="test_key", value="value1", 
team_name=testing_team.name, session=session)
+        Variable.update(key="test_key", value="value2", 
team_name=testing_team.name, session=session)
+        assert Variable.get("test_key", team_name=testing_team.name) == 
"value2"
+
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_variable_update_with_team_global(self, testing_team, session):
+        Variable.set(key="test_key", value="value1", session=session)
+        Variable.update(key="test_key", value="value2", 
team_name=testing_team.name, session=session)
+        assert Variable.get("test_key", team_name=testing_team.name) == 
"value2"
+
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_variable_update_with_wrong_team(self, testing_team, session):
+        Variable.set(key="test_key", value="value1", 
team_name=testing_team.name, session=session)
+        with pytest.raises(KeyError):
+            Variable.update(key="test_key", value="value2", session=session)
+
     def test_set_variable_sets_description(self, session):
         Variable.set(key="key", value="value", description="a test variable", 
session=session)
         test_var = session.query(Variable).filter(Variable.key == "key").one()
@@ -276,6 +312,32 @@ class TestVariable:
         with pytest.raises(KeyError):
             Variable.get(key)
 
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_variable_delete_with_team(self, testing_team, session):
+        key = "tested_var_delete"
+        value = "to be deleted"
+
+        # No-op if the variable doesn't exist
+        Variable.delete(key=key, team_name=testing_team.name, session=session)
+        with pytest.raises(KeyError):
+            Variable.get(key)
+
+        # Delete same team variable
+        Variable.set(key=key, value=value, team_name=testing_team.name, 
session=session)
+        Variable.delete(key=key, team_name=testing_team.name, session=session)
+        with pytest.raises(KeyError):
+            Variable.get(key)
+
+        # Delete global variable
+        Variable.set(key=key, value=value, session=session)
+        Variable.delete(key=key, team_name=testing_team.name, session=session)
+        with pytest.raises(KeyError):
+            Variable.get(key)
+
+        # Attempt to delete a team variable from another one
+        Variable.set(key=key, value=value, team_name=testing_team.name, 
session=session)
+        assert Variable.delete(key=key, session=session) == 0
+
     def test_masking_from_db(self, session):
         """Test secrets are masked when loaded directly from the DB"""
         # Normally people will use `Variable.get`, but just in case, catch 
direct DB access too
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py 
b/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py
index 47f99afd787..0d8e80ae4fb 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py
@@ -225,11 +225,12 @@ class SecretsManagerBackend(BaseSecretsBackend, 
LoggingMixin):
             return standardized_secret
         return secret
 
-    def get_variable(self, key: str) -> str | None:
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
         """
         Get Airflow Variable.
 
         :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
         :return: Variable Value
         """
         if self.variables_prefix is None:
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/secrets/systems_manager.py 
b/providers/amazon/src/airflow/providers/amazon/aws/secrets/systems_manager.py
index 25fb1606f2b..2a1228116da 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/secrets/systems_manager.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/secrets/systems_manager.py
@@ -143,11 +143,12 @@ class 
SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
 
         return self._get_secret(self.connections_prefix, conn_id, 
self.connections_lookup_pattern)
 
-    def get_variable(self, key: str) -> str | None:
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
         """
         Get Airflow Variable.
 
         :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
         :return: Variable Value
         """
         if self.variables_prefix is None:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/secrets/secret_manager.py 
b/providers/google/src/airflow/providers/google/cloud/secrets/secret_manager.py
index c8cdfc6c53b..428f31d064c 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/secrets/secret_manager.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/secrets/secret_manager.py
@@ -160,11 +160,12 @@ class CloudSecretManagerBackend(BaseSecretsBackend, 
LoggingMixin):
 
         return self._get_secret(self.connections_prefix, conn_id)
 
-    def get_variable(self, key: str) -> str | None:
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
         """
         Get Airflow Variable from Environment Variable.
 
         :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
         :return: Variable Value
         """
         if self.variables_prefix is None:
diff --git 
a/providers/hashicorp/src/airflow/providers/hashicorp/secrets/vault.py 
b/providers/hashicorp/src/airflow/providers/hashicorp/secrets/vault.py
index 91dcea39e33..627feb137c4 100644
--- a/providers/hashicorp/src/airflow/providers/hashicorp/secrets/vault.py
+++ b/providers/hashicorp/src/airflow/providers/hashicorp/secrets/vault.py
@@ -208,11 +208,12 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
 
         return Connection(conn_id, **response)
 
-    def get_variable(self, key: str) -> str | None:
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
         """
         Get Airflow Variable.
 
         :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
         :return: Variable Value retrieved from the vault
         """
         mount_point, variable_key = self._parse_path(key)
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/secrets/key_vault.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/secrets/key_vault.py
index 423e5bc247c..64590b6bae6 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/secrets/key_vault.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/secrets/key_vault.py
@@ -155,11 +155,12 @@ class AzureKeyVaultBackend(BaseSecretsBackend, 
LoggingMixin):
 
         return self._get_secret(self.connections_prefix, conn_id)
 
-    def get_variable(self, key: str) -> str | None:
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
         """
         Get an Airflow Variable from an Azure Key Vault secret.
 
         :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
         :return: Variable Value
         """
         if self.variables_prefix is None:
diff --git a/providers/yandex/src/airflow/providers/yandex/secrets/lockbox.py 
b/providers/yandex/src/airflow/providers/yandex/secrets/lockbox.py
index d65131ab2cb..9cbf7321294 100644
--- a/providers/yandex/src/airflow/providers/yandex/secrets/lockbox.py
+++ b/providers/yandex/src/airflow/providers/yandex/secrets/lockbox.py
@@ -160,11 +160,12 @@ class LockboxSecretBackend(BaseSecretsBackend, 
LoggingMixin):
 
         return self._get_secret_value(self.connections_prefix, conn_id)
 
-    def get_variable(self, key: str) -> str | None:
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
         """
         Return value for Airflow Variable.
 
         :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
         :return: Variable Value
         """
         if self.variables_prefix is None:
diff --git a/shared/secrets_backend/src/airflow_shared/secrets_backend/base.py 
b/shared/secrets_backend/src/airflow_shared/secrets_backend/base.py
index 0ae361ffd32..c75e274c598 100644
--- a/shared/secrets_backend/src/airflow_shared/secrets_backend/base.py
+++ b/shared/secrets_backend/src/airflow_shared/secrets_backend/base.py
@@ -44,11 +44,12 @@ class BaseSecretsBackend(ABC):
         """
         raise NotImplementedError
 
-    def get_variable(self, key: str) -> str | None:
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
         """
         Return value for Airflow Variable.
 
         :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
         :return: Variable Value
         """
         raise NotImplementedError()
diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py 
b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
index f9bee875f83..18f80f29dcf 100644
--- a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
+++ b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
@@ -88,11 +88,13 @@ class ExecutionAPISecretsBackend(BaseSecretsBackend):
             # to allow fallback to other backends
             return None
 
-    def get_variable(self, key: str) -> str | None:
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
         """
         Return variable value by routing through SUPERVISOR_COMMS.
 
         :param key: Variable key
+        :param team_id: ID of the team associated to the task trying to access 
the variable.
+            Unused here because the team ID is inferred from the task ID 
provided in the execution API JWT token.
         :return: Variable value or None if not found
         """
         from airflow.sdk.execution_time.comms import ErrorResponse, 
GetVariable, VariableResult

Reply via email to