Hello everyone,

Jarek asked for a proposal on #60125 [1] before implementing access
control for the Execution API's resource endpoints (variables,
connections, XComs), so here it is.

After going through the codebase, I think this is really about
completing AIP-67's [2] multi-team boundary enforcement rather than
introducing a new security model. Most of the infrastructure already
exists. What's missing are the actual authorization checks.

The current state:

The Execution API has three authorization stubs that always return True:

- has_variable_access() in execution_api/routes/variables.py
- has_connection_access() in execution_api/routes/connections.py
- has_xcom_access() in execution_api/routes/xcoms.py

All three have a "# TODO: Placeholder for actual implementation" comment.

For variables and connections, vincbeck's data-layer team scoping
(#58905 [4], #59476 [5]) already prevents cross-team data retrieval in
practice. A cross-team request returns a 404 rather than the resource.
So the data isolation is there, but the auth stubs don't reject these
requests early with a proper 403, and there's no second layer of
protection at the auth check itself.

For XComs, the situation is different. There is no isolation at any
layer. XCom routes take dag_id, run_id, and task_id directly from URL
path parameters with no validation against the calling task's
identity. A task in Team-A's bundle can currently read and write
Team-B's XComs.

There's already a get_team_name_dep() function in deps.py that
resolves a task's team via TaskInstance -> DagModel -> DagBundleModel
-> Team in a single join query. The variable and connection endpoints
already use it. XCom routes don't use it at all.

Proposed approach:

I'm thinking of this in two parts:

1) Team boundary checks for variables and connections

Fill the auth stubs with team boundary checks. For reference, the Core
API does this in security.py. requires_access_variable() resolves the
resource's team via Variable.get_team_name(key), wraps it in
VariableDetails, and passes it to
auth_manager.is_authorized_variable(method, details, user). The auth
manager then checks team membership.

For the Execution API, the flow would be similar but without going
through the auth manager (I'll explain why below):

variable_key -> Variable.get_team_name(key) -> resource_team
token.id -> get_team_name_dep() -> task_team
Deny if resource_team != task_team (when both are non-None)

When core.multi_team is disabled, get_team_name_dep returns None and
the check is skipped, so current single-team behavior stays exactly
the same.

2) XCom authorization

This is the harder part. For writes, I think we should verify the
calling task is writing its own XComs -- the task identity from the
JWT should match the dag_id/task_id in the URL path. For reads,
enforce team boundary so a task can only read XComs from tasks within
the same team. This would allow cross-DAG xcom_pull within a team
(which people already do) while preventing cross-team access.

To avoid a DB lookup on every request, I'd propose adding dag_id to
the JWT claims at generation time. The dag_id is already on the
TaskInstance schema in ExecuteTask.make() (workloads.py:142). The
JWTReissueMiddleware already preserves all claims during token
refresh, so this wouldn't break anything. Adding task_id and run_id to
the token could be done as a follow-up -- there's a TODO at
xcoms.py:315 about eventually deriving these from the token instead of
the URL.

I'm not proposing to add team_name to the token. It's not available on
the TaskInstance schema at generation time. Resolving it requires a DB
join through DagModel -> DagBundleModel -> Team, which would slow down
the scheduler's task queuing path. Better to resolve it at request
time via get_team_name_dep.

Why not go through BaseAuthManager?

One design question I want to raise: the Execution API auth stubs
currently don't call BaseAuthManager.is_authorized_*(), and I think
they probably shouldn't. The BaseAuthManager interface is designed
around human identity (BaseUser with roles and team memberships), but
the Execution API operates on task identity (TIToken with a UUID).
These are very different things. A task doesn't have a "role" in the
RBAC sense, it has a team derived from its DAG's bundle.

I'm leaning toward keeping the authorization logic directly in the
has_*_access dependency functions, using get_team_name_dep for team
resolution. This keeps the Execution API auth simple and avoids tying
task authorization to the human auth manager. But I'd like to hear if
others think we should instead extend BaseAuthManager with
task-identity-aware methods.

What about single-team deployments?

When core.multi_team=False (the default for most deployments), the
team boundary checks would be skipped entirely for variables and
connections. For XComs, I think write ownership verification (task can
only write its own XComs) is worth keeping regardless of multi-team
mode -- it's more of a correctness concern than an authorization one.
But I can also see the argument for a complete no-op when multi_team
is off to keep things simple.

Out of scope:

AIP-72 [3] mentions three possible authorization models:
pre-declaration (DAGs declare required resources), runtime request
with deployment-level policy, and OPA integration via WASM bindings.
I'm not trying to address any of those here. The team-boundary
enforcement is the base that all three future models need.

Implementation plan:

1. Add dag_id claim to JWT token generation in workloads.py
2. Implement has_variable_access team boundary check
3. Implement has_connection_access team boundary check
4. Implement has_xcom_access with write ownership + team boundary
5. Add XCom team resolution (XCom routes currently have no
get_team_name_dep usage)
6. Tests for all authorization scenarios including cross-team denial
7. Documentation update for multi-team authorization behavior

This should be a fairly small change -- mostly filling in the existing
stubs with actual checks.

Let me know what you think.

Anish

[1] https://github.com/apache/airflow/issues/60125#issuecomment-3712218766
[2] 
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-67+Multi-team+deployment+of+Airflow+components
[3] 
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-72+Task+Execution+Interface+aka+Task+SDK
[4] https://github.com/apache/airflow/pull/58905
[5] https://github.com/apache/airflow/pull/59476

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to