Hello everyone, Jarek asked for a proposal on #60125 [1] before implementing access control for the Execution API's resource endpoints (variables, connections, XComs), so here it is.
After going through the codebase, I think this is really about completing AIP-67's [2] multi-team boundary enforcement rather than introducing a new security model. Most of the infrastructure already exists. What's missing are the actual authorization checks. The current state: The Execution API has three authorization stubs that always return True: - has_variable_access() in execution_api/routes/variables.py - has_connection_access() in execution_api/routes/connections.py - has_xcom_access() in execution_api/routes/xcoms.py All three have a "# TODO: Placeholder for actual implementation" comment. For variables and connections, vincbeck's data-layer team scoping (#58905 [4], #59476 [5]) already prevents cross-team data retrieval in practice. A cross-team request returns a 404 rather than the resource. So the data isolation is there, but the auth stubs don't reject these requests early with a proper 403, and there's no second layer of protection at the auth check itself. For XComs, the situation is different. There is no isolation at any layer. XCom routes take dag_id, run_id, and task_id directly from URL path parameters with no validation against the calling task's identity. A task in Team-A's bundle can currently read and write Team-B's XComs. There's already a get_team_name_dep() function in deps.py that resolves a task's team via TaskInstance -> DagModel -> DagBundleModel -> Team in a single join query. The variable and connection endpoints already use it. XCom routes don't use it at all. Proposed approach: I'm thinking of this in two parts: 1) Team boundary checks for variables and connections Fill the auth stubs with team boundary checks. For reference, the Core API does this in security.py. requires_access_variable() resolves the resource's team via Variable.get_team_name(key), wraps it in VariableDetails, and passes it to auth_manager.is_authorized_variable(method, details, user). The auth manager then checks team membership. For the Execution API, the flow would be similar but without going through the auth manager (I'll explain why below): variable_key -> Variable.get_team_name(key) -> resource_team token.id -> get_team_name_dep() -> task_team Deny if resource_team != task_team (when both are non-None) When core.multi_team is disabled, get_team_name_dep returns None and the check is skipped, so current single-team behavior stays exactly the same. 2) XCom authorization This is the harder part. For writes, I think we should verify the calling task is writing its own XComs -- the task identity from the JWT should match the dag_id/task_id in the URL path. For reads, enforce team boundary so a task can only read XComs from tasks within the same team. This would allow cross-DAG xcom_pull within a team (which people already do) while preventing cross-team access. To avoid a DB lookup on every request, I'd propose adding dag_id to the JWT claims at generation time. The dag_id is already on the TaskInstance schema in ExecuteTask.make() (workloads.py:142). The JWTReissueMiddleware already preserves all claims during token refresh, so this wouldn't break anything. Adding task_id and run_id to the token could be done as a follow-up -- there's a TODO at xcoms.py:315 about eventually deriving these from the token instead of the URL. I'm not proposing to add team_name to the token. It's not available on the TaskInstance schema at generation time. Resolving it requires a DB join through DagModel -> DagBundleModel -> Team, which would slow down the scheduler's task queuing path. Better to resolve it at request time via get_team_name_dep. Why not go through BaseAuthManager? One design question I want to raise: the Execution API auth stubs currently don't call BaseAuthManager.is_authorized_*(), and I think they probably shouldn't. The BaseAuthManager interface is designed around human identity (BaseUser with roles and team memberships), but the Execution API operates on task identity (TIToken with a UUID). These are very different things. A task doesn't have a "role" in the RBAC sense, it has a team derived from its DAG's bundle. I'm leaning toward keeping the authorization logic directly in the has_*_access dependency functions, using get_team_name_dep for team resolution. This keeps the Execution API auth simple and avoids tying task authorization to the human auth manager. But I'd like to hear if others think we should instead extend BaseAuthManager with task-identity-aware methods. What about single-team deployments? When core.multi_team=False (the default for most deployments), the team boundary checks would be skipped entirely for variables and connections. For XComs, I think write ownership verification (task can only write its own XComs) is worth keeping regardless of multi-team mode -- it's more of a correctness concern than an authorization one. But I can also see the argument for a complete no-op when multi_team is off to keep things simple. Out of scope: AIP-72 [3] mentions three possible authorization models: pre-declaration (DAGs declare required resources), runtime request with deployment-level policy, and OPA integration via WASM bindings. I'm not trying to address any of those here. The team-boundary enforcement is the base that all three future models need. Implementation plan: 1. Add dag_id claim to JWT token generation in workloads.py 2. Implement has_variable_access team boundary check 3. Implement has_connection_access team boundary check 4. Implement has_xcom_access with write ownership + team boundary 5. Add XCom team resolution (XCom routes currently have no get_team_name_dep usage) 6. Tests for all authorization scenarios including cross-team denial 7. Documentation update for multi-team authorization behavior This should be a fairly small change -- mostly filling in the existing stubs with actual checks. Let me know what you think. Anish [1] https://github.com/apache/airflow/issues/60125#issuecomment-3712218766 [2] https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-67+Multi-team+deployment+of+Airflow+components [3] https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-72+Task+Execution+Interface+aka+Task+SDK [4] https://github.com/apache/airflow/pull/58905 [5] https://github.com/apache/airflow/pull/59476 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
