This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c82c945e430 Apply consumer team filtering (#68025)
c82c945e430 is described below
commit c82c945e430a54be87ab4926b98ec5d93fa8045f
Author: Vincent <[email protected]>
AuthorDate: Thu Jun 4 13:00:08 2026 -0700
Apply consumer team filtering (#68025)
---
airflow-core/src/airflow/assets/manager.py | 79 ++++++++++++++-----
airflow-core/tests/unit/assets/test_manager.py | 104 +++++++++++++++++++++++++
2 files changed, 165 insertions(+), 18 deletions(-)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index 7c12c31f979..2df2e06e82f 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -40,6 +40,7 @@ from airflow.models.asset import (
DagScheduleAssetReference,
DagScheduleAssetUriReference,
PartitionedAssetKeyLog,
+ TaskOutletAssetReference,
)
from airflow.models.log import Log
from airflow.utils.helpers import is_container
@@ -181,15 +182,24 @@ class AssetManager(LoggingMixin):
source_is_api: bool,
*,
session: Session,
+ allow_consumer_teams: list[str] | None = None,
+ allow_global_consumers: bool = True,
) -> set[DagModel]:
"""
Filter consuming DAGs based on team membership when multi_team is
enabled.
+ Both producer-team filtering (consumer decides which producers it
accepts) and
+ consumer-team filtering (producer decides which consumers may receive
its events)
+ must pass for a DAG to be queued (logical AND).
+
:param dags_to_queue: set of DagModel instances to potentially queue.
:param source_teams: set of team names the source belongs to. Empty
set means teamless.
:param asset_model: the AssetModel whose scheduled_dags carry
allow_producer_teams.
:param source_is_api: True if the event was triggered via the REST API
(not a DAG task).
:param session: SQLAlchemy session.
+ :param allow_consumer_teams: list of team names allowed to consume.
Empty/None means no filtering.
+ :param allow_global_consumers: whether teamless consumers are allowed.
Only applies when
+ allow_consumer_teams is non-empty.
"""
if not conf.getboolean("core", "multi_team"):
return dags_to_queue
@@ -212,35 +222,46 @@ class AssetManager(LoggingMixin):
ref.dag_id: ref.allow_global_producers for ref in
asset_model.scheduled_dags
}
+ has_consumer_team_filter = bool(allow_consumer_teams)
+
filtered = set()
for dag in dags_to_queue:
consumer_team = dag_id_to_team.get(dag.dag_id)
+ # --- Producer-team filtering (consumer-side control) ---
+ producer_pass = False
if consumer_team is None:
# Teamless consumer accepts events from any source
- filtered.add(dag)
- continue
-
- if is_teamless_source:
+ producer_pass = True
+ elif is_teamless_source:
if source_is_api:
# Teamless API user can only trigger teamless consumers
- continue
- # Teamless DAG producer — check allow_global_producers
- if dag_id_to_allow_global.get(dag.dag_id, True):
- filtered.add(dag)
- continue
-
- if consumer_team in source_teams:
+ producer_pass = False
+ else:
+ # Teamless DAG producer — check allow_global_producers
+ producer_pass = dag_id_to_allow_global.get(dag.dag_id,
True)
+ elif consumer_team in source_teams:
# Same team
- filtered.add(dag)
- continue
+ producer_pass = True
+ else:
+ allow_producer_teams = dag_id_to_allow_teams.get(dag.dag_id,
[])
+ if source_teams & set(allow_producer_teams):
+ # Cross-team via allow_producer_teams
+ producer_pass = True
- allow_producer_teams = dag_id_to_allow_teams.get(dag.dag_id, [])
- if source_teams & set(allow_producer_teams):
- # Cross-team via allow_producer_teams
- filtered.add(dag)
+ if not producer_pass:
continue
+ # --- Consumer-team filtering (producer-side control) ---
+ if has_consumer_team_filter and allow_consumer_teams is not None:
+ if consumer_team is None:
+ if not allow_global_consumers:
+ continue
+ elif consumer_team not in allow_consumer_teams:
+ continue
+
+ filtered.add(dag)
+
return filtered
@classmethod
@@ -255,6 +276,8 @@ class AssetManager(LoggingMixin):
partition_key: str | None = None,
source_is_api: bool = False,
api_user_teams: set[str] | None = None,
+ api_allow_consumer_teams: list[str] | None = None,
+ api_allow_global_consumers: bool = True,
**kwargs,
) -> AssetEvent | None:
"""
@@ -266,14 +289,20 @@ class AssetManager(LoggingMixin):
When multi_team mode is enabled, team-based filtering is applied to
determine which
consumer DAGs should be queued:
- For DAG-produced events (task_instance is set), source teams are
resolved automatically
- from the producing DAG's bundle.
+ from the producing DAG's bundle. Consumer-team filtering is resolved
from the
+ TaskOutletAssetReference for the producing task.
- For API-produced events (source_is_api=True), ``api_user_teams``
must be provided explicitly.
+ Consumer-team filtering uses ``api_allow_consumer_teams`` and
``api_allow_global_consumers``.
:param source_is_api: True if the event originates from the REST API
rather than
a DAG task execution.
:param api_user_teams: Teams of the API user triggering the event.
Only used when
source_is_api=True. Ignored when task_instance is provided (teams
are resolved
from the DAG's bundle instead).
+ :param api_allow_consumer_teams: Consumer teams allowed by an
API-triggered event.
+ Only used when source_is_api=True.
+ :param api_allow_global_consumers: Whether teamless consumers are
allowed for an
+ API-triggered event. Only used when source_is_api=True. Defaults
to True.
"""
from airflow.models.dag import DagModel
@@ -378,14 +407,28 @@ class AssetManager(LoggingMixin):
if task_instance:
team_name = DagModel.get_team_name(task_instance.dag_id,
session=session)
resolved_source_teams = {team_name} if team_name else set()
+ # Resolve consumer-team filtering from the outlet reference
+ outlet_ref = session.scalar(
+ select(TaskOutletAssetReference).where(
+ TaskOutletAssetReference.dag_id ==
task_instance.dag_id,
+ TaskOutletAssetReference.task_id ==
task_instance.task_id,
+ TaskOutletAssetReference.asset_id == asset_model.id,
+ )
+ )
+ resolved_consumer_teams = outlet_ref.allow_consumer_teams if
outlet_ref else None
+ resolved_global_consumers = outlet_ref.allow_global_consumers
if outlet_ref else True
else:
resolved_source_teams = api_user_teams or set()
+ resolved_consumer_teams = api_allow_consumer_teams
+ resolved_global_consumers = api_allow_global_consumers
dags_to_queue = cls._filter_dags_by_team(
dags_to_queue=dags_to_queue,
source_teams=resolved_source_teams,
asset_model=asset_model,
source_is_api=source_is_api,
session=session,
+ allow_consumer_teams=resolved_consumer_teams,
+ allow_global_consumers=resolved_global_consumers,
)
log.debug("asset event added", asset_event=asset_event,
dags_to_queue=dags_to_queue)
diff --git a/airflow-core/tests/unit/assets/test_manager.py
b/airflow-core/tests/unit/assets/test_manager.py
index 8f9d290a2b0..69439812ec0 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -581,3 +581,107 @@ class TestFilterDagsByTeam:
)
assert dag_with_team not in result
+
+ @conf_vars({("core", "multi_team"): "true"})
+ @pytest.mark.parametrize(
+ (
+ "team_mapping",
+ "source_teams",
+ "scheduled_dags",
+ "allow_consumer_teams",
+ "allow_global_consumers",
+ "expected_in",
+ ),
+ [
+ pytest.param(
+ {"dag1": "team_b"},
+ {"team_a"},
+ {"dag1": ["team_a"]},
+ ["team_a"],
+ True,
+ False,
+ id="consumer_blocked_when_team_not_in_allow_consumer_teams",
+ ),
+ pytest.param(
+ {"dag1": "team_b"},
+ {"team_a"},
+ {"dag1": ["team_a"]},
+ ["team_a", "team_b"],
+ True,
+ True,
+ id="consumer_allowed_when_team_in_allow_consumer_teams",
+ ),
+ pytest.param(
+ {},
+ {"team_a"},
+ {},
+ ["team_b"],
+ True,
+ True,
+ id="teamless_consumer_passes_when_allow_global_consumers_true",
+ ),
+ pytest.param(
+ {},
+ {"team_a"},
+ {},
+ ["team_b"],
+ False,
+ False,
+
id="teamless_consumer_blocked_when_allow_global_consumers_false",
+ ),
+ pytest.param(
+ {"dag1": "team_b"},
+ {"team_a"},
+ {"dag1": []},
+ ["team_b"],
+ True,
+ False,
+ id="both_filters_must_pass_and_logic",
+ ),
+ pytest.param(
+ {"dag1": "team_b"},
+ {"team_a"},
+ {"dag1": ["team_a"]},
+ [],
+ True,
+ True,
+ id="empty_allow_consumer_teams_means_no_consumer_filtering",
+ ),
+ pytest.param(
+ {"dag1": "team_b"},
+ {"team_a"},
+ {"dag1": ["team_a"]},
+ None,
+ True,
+ True,
+ id="none_allow_consumer_teams_means_no_consumer_filtering",
+ ),
+ ],
+ )
+ @mock.patch.object(DagModel, "get_dag_id_to_team_name_mapping")
+ def test_consumer_team_filtering(
+ self,
+ mock_mapping,
+ team_mapping,
+ source_teams,
+ scheduled_dags,
+ allow_consumer_teams,
+ allow_global_consumers,
+ expected_in,
+ ):
+ dag = _make_dag("dag1")
+ mock_mapping.return_value = team_mapping
+
+ result = AssetManager._filter_dags_by_team(
+ dags_to_queue={dag},
+ source_teams=source_teams,
+ asset_model=_make_asset_model(scheduled_dags=scheduled_dags)
+ if scheduled_dags
+ else _make_asset_model(),
+ source_is_api=False,
+ session=mock.Mock(),
+ allow_consumer_teams=allow_consumer_teams,
+ allow_global_consumers=allow_global_consumers,
+ )
+
+ assert (dag in result) == expected_in