This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d95a3c414a Add `allow_consumer_teams` and `allow_global_consumers` 
columns to `TaskOutletAssetReference` (#67730)
7d95a3c414a is described below

commit 7d95a3c414a7a947869d3dc66feeb61a290b62f1
Author: Vincent <[email protected]>
AuthorDate: Thu Jun 4 08:22:26 2026 -0700

    Add `allow_consumer_teams` and `allow_global_consumers` columns to 
`TaskOutletAssetReference` (#67730)
---
 airflow-core/docs/migrations-ref.rst               |  5 +-
 .../src/airflow/dag_processing/collection.py       | 26 ++++--
 ...onsumer_teams_to_task_outlet_asset_reference.py | 56 +++++++++++++
 airflow-core/src/airflow/models/asset.py           |  4 +
 airflow-core/src/airflow/utils/db.py               |  2 +-
 .../tests/unit/dag_processing/test_collection.py   | 93 ++++++++++++++++++++--
 6 files changed, 171 insertions(+), 15 deletions(-)

diff --git a/airflow-core/docs/migrations-ref.rst 
b/airflow-core/docs/migrations-ref.rst
index 97f2bc988ad..866f521b6cd 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | Revision ID             | Revises ID       | Airflow Version   | Description 
                                                 |
 
+=========================+==================+===================+==============================================================+
-| ``a7e6d4c3b2f1`` (head) | ``8812eb67b63c`` | ``3.3.0``         | Add 
connection_test_request table for the deferred           |
+| ``c9d4e5f6a7b8`` (head) | ``a7e6d4c3b2f1`` | ``3.3.0``         | Add 
allow_consumer_teams columns to                          |
+|                         |                  |                   | 
task_outlet_asset_reference table.                           |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``a7e6d4c3b2f1``        | ``8812eb67b63c`` | ``3.3.0``         | Add 
connection_test_request table for the deferred           |
 |                         |                  |                   | 
connection-test workflow.                                    |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | ``8812eb67b63c``        | ``acc215baed80`` | ``3.3.0``         | Change 
Deadline interval to JSON.                            |
diff --git a/airflow-core/src/airflow/dag_processing/collection.py 
b/airflow-core/src/airflow/dag_processing/collection.py
index 174e0872d20..40da02acc49 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -1027,17 +1027,31 @@ class AssetModelOperation(NamedTuple):
                 dags[dag_id].task_outlet_asset_references = []
                 continue
             referenced_outlets = {
-                (task_id, asset.id)
-                for task_id, asset in ((task_id, assets[d.name, d.uri]) for 
task_id, d in references)
+                (task_id, assets[d.name, d.uri]): (
+                    d.access_control.get("consumer_teams", []),
+                    d.access_control.get("allow_global", True),
+                )
+                for task_id, d in references
             }
+            referenced_outlet_keys = {(task_id, asset.id) for (task_id, asset) 
in referenced_outlets}
             orm_refs = {(r.task_id, r.asset_id): r for r in 
dags[dag_id].task_outlet_asset_references}
             for key, ref in orm_refs.items():
-                if key not in referenced_outlets:
+                if key not in referenced_outlet_keys:
                     session.delete(ref)
+            for (task_id, asset_model), (consumer_teams, allow_global) in 
referenced_outlets.items():
+                if (task_id, asset_model.id) in orm_refs:
+                    orm_refs[(task_id, asset_model.id)].allow_consumer_teams = 
consumer_teams
+                    orm_refs[(task_id, asset_model.id)].allow_global_consumers 
= allow_global
             session.bulk_save_objects(
-                TaskOutletAssetReference(asset_id=asset_id, dag_id=dag_id, 
task_id=task_id)
-                for task_id, asset_id in referenced_outlets
-                if (task_id, asset_id) not in orm_refs
+                TaskOutletAssetReference(
+                    asset_id=asset_model.id,
+                    dag_id=dag_id,
+                    task_id=task_id,
+                    allow_consumer_teams=consumer_teams,
+                    allow_global_consumers=allow_global,
+                )
+                for (task_id, asset_model), (consumer_teams, allow_global) in 
referenced_outlets.items()
+                if (task_id, asset_model.id) not in orm_refs
             )
 
     def add_asset_trigger_references(
diff --git 
a/airflow-core/src/airflow/migrations/versions/0119_3_3_0_add_allow_consumer_teams_to_task_outlet_asset_reference.py
 
b/airflow-core/src/airflow/migrations/versions/0119_3_3_0_add_allow_consumer_teams_to_task_outlet_asset_reference.py
new file mode 100644
index 00000000000..9501014417a
--- /dev/null
+++ 
b/airflow-core/src/airflow/migrations/versions/0119_3_3_0_add_allow_consumer_teams_to_task_outlet_asset_reference.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add allow_consumer_teams columns to task_outlet_asset_reference table.
+
+Revision ID: c9d4e5f6a7b8
+Revises: a7e6d4c3b2f1
+Create Date: 2026-05-29 12:00:00.000000
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+revision = "c9d4e5f6a7b8"
+down_revision = "a7e6d4c3b2f1"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+    """Add consumer-team access control columns to 
task_outlet_asset_reference."""
+    with op.batch_alter_table("task_outlet_asset_reference", schema=None) as 
batch_op:
+        batch_op.add_column(sa.Column("allow_consumer_teams", sa.JSON(), 
nullable=True))
+        batch_op.add_column(
+            sa.Column("allow_global_consumers", sa.Boolean(), nullable=False, 
server_default=sa.true())
+        )
+
+
+def downgrade():
+    """Remove consumer-team access control columns from 
task_outlet_asset_reference."""
+    from airflow.migrations.utils import disable_sqlite_fkeys
+
+    with disable_sqlite_fkeys(op):
+        with op.batch_alter_table("task_outlet_asset_reference", schema=None) 
as batch_op:
+            batch_op.drop_column("allow_global_consumers")
+            batch_op.drop_column("allow_consumer_teams")
diff --git a/airflow-core/src/airflow/models/asset.py 
b/airflow-core/src/airflow/models/asset.py
index d07fc876b17..573c5479539 100644
--- a/airflow-core/src/airflow/models/asset.py
+++ b/airflow-core/src/airflow/models/asset.py
@@ -649,6 +649,10 @@ class TaskOutletAssetReference(Base):
     asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, 
nullable=False)
     dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, 
nullable=False)
     task_id: Mapped[str] = mapped_column(StringID(), primary_key=True, 
nullable=False)
+    allow_consumer_teams: Mapped[list | None] = mapped_column(sa.JSON(), 
nullable=True)
+    allow_global_consumers: Mapped[bool] = mapped_column(
+        sa.Boolean(), nullable=False, server_default=sa.true()
+    )
     created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
     updated_at: Mapped[datetime] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, 
nullable=False
diff --git a/airflow-core/src/airflow/utils/db.py 
b/airflow-core/src/airflow/utils/db.py
index 49db325cdd0..39ca604df15 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -116,7 +116,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
     "3.1.0": "cc92b33c6709",
     "3.1.8": "509b94a1042d",
     "3.2.0": "1d6611b6ab7c",
-    "3.3.0": "a7e6d4c3b2f1",
+    "3.3.0": "c9d4e5f6a7b8",
 }
 
 # Prefix used to identify tables holding data moved during migration.
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py 
b/airflow-core/tests/unit/dag_processing/test_collection.py
index ab2a950a9af..495d1e716fc 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -181,10 +181,10 @@ class TestAssetModelOperation:
     def test_sync_assets_preserves_access_control_from_other_bundle(self, 
dag_maker, session):
         """When a producer bundle (without access_control) is synced after a 
consumer bundle
         (with access_control), the stored access control fields must not be 
wiped out."""
-        from airflow.models.asset import DagScheduleAssetReference
+        from airflow.models.asset import DagScheduleAssetReference, 
TaskOutletAssetReference
         from airflow.sdk import AssetAccessControl
 
-        # First sync: consumer bundle sets access_control on the asset.
+        # First sync: consumer bundle sets access_control on the asset 
(producer_teams on schedule side).
         consumer_asset = Asset(
             "shared_asset",
             access_control=AssetAccessControl(producer_teams=["team1", 
"team2"], allow_global=False),
@@ -206,22 +206,101 @@ class TestAssetModelOperation:
         assert ref.allow_producer_teams == ["team1", "team2"]
         assert ref.allow_global_producers is False
 
-        # Second sync: producer bundle references the same asset WITHOUT 
access_control.
-        producer_asset = Asset("shared_asset")
+        # Second sync: producer bundle references the same asset with 
consumer_teams on the outlet.
+        producer_asset = Asset(
+            "shared_asset",
+            access_control=AssetAccessControl(consumer_teams=["team_ml"], 
allow_global=False),
+        )
         with dag_maker(dag_id="producer_dag", schedule="@once") as 
producer_dag:
             EmptyOperator(task_id="produce", outlets=[producer_asset])
 
         producer_dags = {producer_dag.dag_id: 
LazyDeserializedDAG.from_dag(producer_dag)}
-        DagModelOperation(producer_dags, "testing", 
None).add_dags(session=session)
+        producer_orm_dags = DagModelOperation(producer_dags, "testing", 
None).add_dags(session=session)
         asset_op = AssetModelOperation.collect(producer_dags)
-        asset_op.sync_assets(session=session)
+        orm_assets = asset_op.sync_assets(session=session)
+        session.flush()
+        asset_op.add_task_asset_references(producer_orm_dags, orm_assets, 
session=session)
         session.flush()
 
-        # Consumer's access control must still be preserved.
+        # Consumer's schedule-side access control must still be preserved.
         session.expire(ref)
         assert ref.allow_producer_teams == ["team1", "team2"]
         assert ref.allow_global_producers is False
 
+        # Producer's outlet-side access control must be stored.
+        outlet_ref = session.scalar(
+            
select(TaskOutletAssetReference).where(TaskOutletAssetReference.dag_id == 
"producer_dag")
+        )
+        assert outlet_ref.allow_consumer_teams == ["team_ml"]
+        assert outlet_ref.allow_global_consumers is False
+
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def 
test_add_task_outlet_asset_references_updates_consumer_teams_on_change(self, 
dag_maker, session):
+        """When access_control changes, existing outlet references are updated 
in place."""
+        from airflow.models.asset import TaskOutletAssetReference
+        from airflow.sdk import AssetAccessControl
+
+        asset = Asset(
+            "evolving_asset",
+            access_control=AssetAccessControl(consumer_teams=["team_old"], 
allow_global=True),
+        )
+
+        with dag_maker(dag_id="evolving_producer", schedule="@once") as dag:
+            EmptyOperator(task_id="produce", outlets=[asset])
+
+        dags = {dag.dag_id: LazyDeserializedDAG.from_dag(dag)}
+        orm_dags = DagModelOperation(dags, "testing", 
None).add_dags(session=session)
+        asset_op = AssetModelOperation.collect(dags)
+        orm_assets = asset_op.sync_assets(session=session)
+        session.flush()
+        asset_op.add_task_asset_references(orm_dags, orm_assets, 
session=session)
+        session.flush()
+
+        ref = session.scalar(
+            
select(TaskOutletAssetReference).where(TaskOutletAssetReference.dag_id == 
"evolving_producer")
+        )
+        assert ref.allow_consumer_teams == ["team_old"]
+        assert ref.allow_global_consumers is True
+
+        # Change access_control and re-sync.
+        asset.access_control = AssetAccessControl(consumer_teams=["team_new"], 
allow_global=False)
+        dags = {dag.dag_id: LazyDeserializedDAG.from_dag(dag)}
+        orm_dags = DagModelOperation(dags, "testing", 
None).find_orm_dags(session=session)
+        asset_op = AssetModelOperation.collect(dags)
+        orm_assets = asset_op.sync_assets(session=session)
+        session.flush()
+        asset_op.add_task_asset_references(orm_dags, orm_assets, 
session=session)
+        session.flush()
+
+        session.expire(ref)
+        assert ref.allow_consumer_teams == ["team_new"]
+        assert ref.allow_global_consumers is False
+
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def 
test_add_task_outlet_asset_references_defaults_when_no_access_control(self, 
dag_maker, session):
+        """Outlet references default to empty consumer_teams and 
allow_global_consumers=True."""
+        from airflow.models.asset import TaskOutletAssetReference
+
+        asset = Asset("plain_asset")
+
+        with dag_maker(dag_id="plain_producer_dag", schedule="@once") as dag:
+            EmptyOperator(task_id="produce", outlets=[asset])
+
+        dags = {dag.dag_id: LazyDeserializedDAG.from_dag(dag)}
+        orm_dags = DagModelOperation(dags, "testing", 
None).add_dags(session=session)
+        asset_op = AssetModelOperation.collect(dags)
+        orm_assets = asset_op.sync_assets(session=session)
+        session.flush()
+        asset_op.add_task_asset_references(orm_dags, orm_assets, 
session=session)
+        session.flush()
+
+        ref = session.scalar(
+            
select(TaskOutletAssetReference).where(TaskOutletAssetReference.dag_id == 
"plain_producer_dag")
+        )
+        assert ref is not None
+        assert ref.allow_consumer_teams == []
+        assert ref.allow_global_consumers is True
+
     @pytest.mark.parametrize(
         ("is_active", "is_paused", "expected_num_triggers"),
         [

Reply via email to