This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7d95a3c414a Add `allow_consumer_teams` and `allow_global_consumers`
columns to `TaskOutletAssetReference` (#67730)
7d95a3c414a is described below
commit 7d95a3c414a7a947869d3dc66feeb61a290b62f1
Author: Vincent <[email protected]>
AuthorDate: Thu Jun 4 08:22:26 2026 -0700
Add `allow_consumer_teams` and `allow_global_consumers` columns to
`TaskOutletAssetReference` (#67730)
---
airflow-core/docs/migrations-ref.rst | 5 +-
.../src/airflow/dag_processing/collection.py | 26 ++++--
...onsumer_teams_to_task_outlet_asset_reference.py | 56 +++++++++++++
airflow-core/src/airflow/models/asset.py | 4 +
airflow-core/src/airflow/utils/db.py | 2 +-
.../tests/unit/dag_processing/test_collection.py | 93 ++++++++++++++++++++--
6 files changed, 171 insertions(+), 15 deletions(-)
diff --git a/airflow-core/docs/migrations-ref.rst
b/airflow-core/docs/migrations-ref.rst
index 97f2bc988ad..866f521b6cd 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description
|
+=========================+==================+===================+==============================================================+
-| ``a7e6d4c3b2f1`` (head) | ``8812eb67b63c`` | ``3.3.0`` | Add
connection_test_request table for the deferred |
+| ``c9d4e5f6a7b8`` (head) | ``a7e6d4c3b2f1`` | ``3.3.0`` | Add
allow_consumer_teams columns to |
+| | | |
task_outlet_asset_reference table. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``a7e6d4c3b2f1`` | ``8812eb67b63c`` | ``3.3.0`` | Add
connection_test_request table for the deferred |
| | | |
connection-test workflow. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``8812eb67b63c`` | ``acc215baed80`` | ``3.3.0`` | Change
Deadline interval to JSON. |
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index 174e0872d20..40da02acc49 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -1027,17 +1027,31 @@ class AssetModelOperation(NamedTuple):
dags[dag_id].task_outlet_asset_references = []
continue
referenced_outlets = {
- (task_id, asset.id)
- for task_id, asset in ((task_id, assets[d.name, d.uri]) for
task_id, d in references)
+ (task_id, assets[d.name, d.uri]): (
+ d.access_control.get("consumer_teams", []),
+ d.access_control.get("allow_global", True),
+ )
+ for task_id, d in references
}
+ referenced_outlet_keys = {(task_id, asset.id) for (task_id, asset)
in referenced_outlets}
orm_refs = {(r.task_id, r.asset_id): r for r in
dags[dag_id].task_outlet_asset_references}
for key, ref in orm_refs.items():
- if key not in referenced_outlets:
+ if key not in referenced_outlet_keys:
session.delete(ref)
+ for (task_id, asset_model), (consumer_teams, allow_global) in
referenced_outlets.items():
+ if (task_id, asset_model.id) in orm_refs:
+ orm_refs[(task_id, asset_model.id)].allow_consumer_teams =
consumer_teams
+ orm_refs[(task_id, asset_model.id)].allow_global_consumers
= allow_global
session.bulk_save_objects(
- TaskOutletAssetReference(asset_id=asset_id, dag_id=dag_id,
task_id=task_id)
- for task_id, asset_id in referenced_outlets
- if (task_id, asset_id) not in orm_refs
+ TaskOutletAssetReference(
+ asset_id=asset_model.id,
+ dag_id=dag_id,
+ task_id=task_id,
+ allow_consumer_teams=consumer_teams,
+ allow_global_consumers=allow_global,
+ )
+ for (task_id, asset_model), (consumer_teams, allow_global) in
referenced_outlets.items()
+ if (task_id, asset_model.id) not in orm_refs
)
def add_asset_trigger_references(
diff --git
a/airflow-core/src/airflow/migrations/versions/0119_3_3_0_add_allow_consumer_teams_to_task_outlet_asset_reference.py
b/airflow-core/src/airflow/migrations/versions/0119_3_3_0_add_allow_consumer_teams_to_task_outlet_asset_reference.py
new file mode 100644
index 00000000000..9501014417a
--- /dev/null
+++
b/airflow-core/src/airflow/migrations/versions/0119_3_3_0_add_allow_consumer_teams_to_task_outlet_asset_reference.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add allow_consumer_teams columns to task_outlet_asset_reference table.
+
+Revision ID: c9d4e5f6a7b8
+Revises: a7e6d4c3b2f1
+Create Date: 2026-05-29 12:00:00.000000
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+revision = "c9d4e5f6a7b8"
+down_revision = "a7e6d4c3b2f1"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+ """Add consumer-team access control columns to
task_outlet_asset_reference."""
+ with op.batch_alter_table("task_outlet_asset_reference", schema=None) as
batch_op:
+ batch_op.add_column(sa.Column("allow_consumer_teams", sa.JSON(),
nullable=True))
+ batch_op.add_column(
+ sa.Column("allow_global_consumers", sa.Boolean(), nullable=False,
server_default=sa.true())
+ )
+
+
+def downgrade():
+ """Remove consumer-team access control columns from
task_outlet_asset_reference."""
+ from airflow.migrations.utils import disable_sqlite_fkeys
+
+ with disable_sqlite_fkeys(op):
+ with op.batch_alter_table("task_outlet_asset_reference", schema=None)
as batch_op:
+ batch_op.drop_column("allow_global_consumers")
+ batch_op.drop_column("allow_consumer_teams")
diff --git a/airflow-core/src/airflow/models/asset.py
b/airflow-core/src/airflow/models/asset.py
index d07fc876b17..573c5479539 100644
--- a/airflow-core/src/airflow/models/asset.py
+++ b/airflow-core/src/airflow/models/asset.py
@@ -649,6 +649,10 @@ class TaskOutletAssetReference(Base):
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True,
nullable=False)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True,
nullable=False)
task_id: Mapped[str] = mapped_column(StringID(), primary_key=True,
nullable=False)
+ allow_consumer_teams: Mapped[list | None] = mapped_column(sa.JSON(),
nullable=True)
+ allow_global_consumers: Mapped[bool] = mapped_column(
+ sa.Boolean(), nullable=False, server_default=sa.true()
+ )
created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow,
nullable=False
diff --git a/airflow-core/src/airflow/utils/db.py
b/airflow-core/src/airflow/utils/db.py
index 49db325cdd0..39ca604df15 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -116,7 +116,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
"3.1.0": "cc92b33c6709",
"3.1.8": "509b94a1042d",
"3.2.0": "1d6611b6ab7c",
- "3.3.0": "a7e6d4c3b2f1",
+ "3.3.0": "c9d4e5f6a7b8",
}
# Prefix used to identify tables holding data moved during migration.
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py
b/airflow-core/tests/unit/dag_processing/test_collection.py
index ab2a950a9af..495d1e716fc 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -181,10 +181,10 @@ class TestAssetModelOperation:
def test_sync_assets_preserves_access_control_from_other_bundle(self,
dag_maker, session):
"""When a producer bundle (without access_control) is synced after a
consumer bundle
(with access_control), the stored access control fields must not be
wiped out."""
- from airflow.models.asset import DagScheduleAssetReference
+ from airflow.models.asset import DagScheduleAssetReference,
TaskOutletAssetReference
from airflow.sdk import AssetAccessControl
- # First sync: consumer bundle sets access_control on the asset.
+ # First sync: consumer bundle sets access_control on the asset
(producer_teams on schedule side).
consumer_asset = Asset(
"shared_asset",
access_control=AssetAccessControl(producer_teams=["team1",
"team2"], allow_global=False),
@@ -206,22 +206,101 @@ class TestAssetModelOperation:
assert ref.allow_producer_teams == ["team1", "team2"]
assert ref.allow_global_producers is False
- # Second sync: producer bundle references the same asset WITHOUT
access_control.
- producer_asset = Asset("shared_asset")
+ # Second sync: producer bundle references the same asset with
consumer_teams on the outlet.
+ producer_asset = Asset(
+ "shared_asset",
+ access_control=AssetAccessControl(consumer_teams=["team_ml"],
allow_global=False),
+ )
with dag_maker(dag_id="producer_dag", schedule="@once") as
producer_dag:
EmptyOperator(task_id="produce", outlets=[producer_asset])
producer_dags = {producer_dag.dag_id:
LazyDeserializedDAG.from_dag(producer_dag)}
- DagModelOperation(producer_dags, "testing",
None).add_dags(session=session)
+ producer_orm_dags = DagModelOperation(producer_dags, "testing",
None).add_dags(session=session)
asset_op = AssetModelOperation.collect(producer_dags)
- asset_op.sync_assets(session=session)
+ orm_assets = asset_op.sync_assets(session=session)
+ session.flush()
+ asset_op.add_task_asset_references(producer_orm_dags, orm_assets,
session=session)
session.flush()
- # Consumer's access control must still be preserved.
+ # Consumer's schedule-side access control must still be preserved.
session.expire(ref)
assert ref.allow_producer_teams == ["team1", "team2"]
assert ref.allow_global_producers is False
+ # Producer's outlet-side access control must be stored.
+ outlet_ref = session.scalar(
+
select(TaskOutletAssetReference).where(TaskOutletAssetReference.dag_id ==
"producer_dag")
+ )
+ assert outlet_ref.allow_consumer_teams == ["team_ml"]
+ assert outlet_ref.allow_global_consumers is False
+
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def
test_add_task_outlet_asset_references_updates_consumer_teams_on_change(self,
dag_maker, session):
+ """When access_control changes, existing outlet references are updated
in place."""
+ from airflow.models.asset import TaskOutletAssetReference
+ from airflow.sdk import AssetAccessControl
+
+ asset = Asset(
+ "evolving_asset",
+ access_control=AssetAccessControl(consumer_teams=["team_old"],
allow_global=True),
+ )
+
+ with dag_maker(dag_id="evolving_producer", schedule="@once") as dag:
+ EmptyOperator(task_id="produce", outlets=[asset])
+
+ dags = {dag.dag_id: LazyDeserializedDAG.from_dag(dag)}
+ orm_dags = DagModelOperation(dags, "testing",
None).add_dags(session=session)
+ asset_op = AssetModelOperation.collect(dags)
+ orm_assets = asset_op.sync_assets(session=session)
+ session.flush()
+ asset_op.add_task_asset_references(orm_dags, orm_assets,
session=session)
+ session.flush()
+
+ ref = session.scalar(
+
select(TaskOutletAssetReference).where(TaskOutletAssetReference.dag_id ==
"evolving_producer")
+ )
+ assert ref.allow_consumer_teams == ["team_old"]
+ assert ref.allow_global_consumers is True
+
+ # Change access_control and re-sync.
+ asset.access_control = AssetAccessControl(consumer_teams=["team_new"],
allow_global=False)
+ dags = {dag.dag_id: LazyDeserializedDAG.from_dag(dag)}
+ orm_dags = DagModelOperation(dags, "testing",
None).find_orm_dags(session=session)
+ asset_op = AssetModelOperation.collect(dags)
+ orm_assets = asset_op.sync_assets(session=session)
+ session.flush()
+ asset_op.add_task_asset_references(orm_dags, orm_assets,
session=session)
+ session.flush()
+
+ session.expire(ref)
+ assert ref.allow_consumer_teams == ["team_new"]
+ assert ref.allow_global_consumers is False
+
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def
test_add_task_outlet_asset_references_defaults_when_no_access_control(self,
dag_maker, session):
+ """Outlet references default to empty consumer_teams and
allow_global_consumers=True."""
+ from airflow.models.asset import TaskOutletAssetReference
+
+ asset = Asset("plain_asset")
+
+ with dag_maker(dag_id="plain_producer_dag", schedule="@once") as dag:
+ EmptyOperator(task_id="produce", outlets=[asset])
+
+ dags = {dag.dag_id: LazyDeserializedDAG.from_dag(dag)}
+ orm_dags = DagModelOperation(dags, "testing",
None).add_dags(session=session)
+ asset_op = AssetModelOperation.collect(dags)
+ orm_assets = asset_op.sync_assets(session=session)
+ session.flush()
+ asset_op.add_task_asset_references(orm_dags, orm_assets,
session=session)
+ session.flush()
+
+ ref = session.scalar(
+
select(TaskOutletAssetReference).where(TaskOutletAssetReference.dag_id ==
"plain_producer_dag")
+ )
+ assert ref is not None
+ assert ref.allow_consumer_teams == []
+ assert ref.allow_global_consumers is True
+
@pytest.mark.parametrize(
("is_active", "is_paused", "expected_num_triggers"),
[