This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f72e4c41718 Add `team_name` column to trigger table for multi-team 
triggerer support (#67305)
f72e4c41718 is described below

commit f72e4c417187335b3124a5cfe0766df32b5d6aba
Author: Ramit Kataria <[email protected]>
AuthorDate: Mon May 25 08:59:45 2026 -0700

    Add `team_name` column to trigger table for multi-team triggerer support 
(#67305)
    
    * Add `team_name` column to trigger table for multi-team triggerer support
    
    Adds a nullable `team_name` column (FK -> team.name, ondelete=SET NULL) with
    an index to the trigger table. This is the schema foundation for team-scoped
    triggerer instances. Follow-up PRs will populate the column at trigger
    creation time and add query filtering so each triggerer only processes its
    team's triggers.
    
    When a team is deleted, in-flight triggers become global (NULL) and are
    picked up by the global triggerer to run to completion.
    
    No behavioral change, this PR only adds the column, migration, and
    constructor parameter.
    
    * Add code comment for context about denormalization
---
 airflow-core/docs/migrations-ref.rst               |  4 +-
 .../0116_3_3_0_add_team_name_to_trigger_table.py   | 55 ++++++++++++++++++++++
 airflow-core/src/airflow/models/trigger.py         | 13 ++++-
 airflow-core/src/airflow/utils/db.py               |  2 +-
 airflow-core/tests/unit/models/test_trigger.py     | 11 +++++
 5 files changed, 82 insertions(+), 3 deletions(-)

diff --git a/airflow-core/docs/migrations-ref.rst 
b/airflow-core/docs/migrations-ref.rst
index 42056f2c6cb..82f32c8a2fd 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | Revision ID             | Revises ID       | Airflow Version   | Description 
                                                 |
 
+=========================+==================+===================+==============================================================+
-| ``a1b2c3d4e5f6`` (head) | ``a7f3b2c1d4e5`` | ``3.3.0``         | Add 
version_data to dag_version.                             |
+| ``acc215baed80`` (head) | ``a1b2c3d4e5f6`` | ``3.3.0``         | Add 
team_name to trigger table.                              |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``a1b2c3d4e5f6``        | ``a7f3b2c1d4e5`` | ``3.3.0``         | Add 
version_data to dag_version.                             |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | ``a7f3b2c1d4e5``        | ``b8f3e4a1d2c9`` | ``3.3.0``         | Add access 
control columns to dag_schedule_asset_reference   |
 |                         |                  |                   | table.      
                                                 |
diff --git 
a/airflow-core/src/airflow/migrations/versions/0116_3_3_0_add_team_name_to_trigger_table.py
 
b/airflow-core/src/airflow/migrations/versions/0116_3_3_0_add_team_name_to_trigger_table.py
new file mode 100644
index 00000000000..503f1b58d7c
--- /dev/null
+++ 
b/airflow-core/src/airflow/migrations/versions/0116_3_3_0_add_team_name_to_trigger_table.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add team_name to trigger table.
+
+Revision ID: acc215baed80
+Revises: a1b2c3d4e5f6
+Create Date: 2026-05-21 21:38:00.122692
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "acc215baed80"
+down_revision = "a1b2c3d4e5f6"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+    """Add team_name to trigger table."""
+    with op.batch_alter_table("trigger", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("team_name", sa.String(length=50), 
nullable=True))
+        batch_op.create_index(batch_op.f("idx_trigger_team_name"), 
["team_name"], unique=False)
+        batch_op.create_foreign_key(
+            batch_op.f("trigger_team_name_fkey"), "team", ["team_name"], 
["name"], ondelete="SET NULL"
+        )
+
+
+def downgrade():
+    """Remove team_name from trigger table."""
+    with op.batch_alter_table("trigger", schema=None) as batch_op:
+        batch_op.drop_constraint(batch_op.f("trigger_team_name_fkey"), 
type_="foreignkey")
+        batch_op.drop_index(batch_op.f("idx_trigger_team_name"))
+        batch_op.drop_column("team_name")
diff --git a/airflow-core/src/airflow/models/trigger.py 
b/airflow-core/src/airflow/models/trigger.py
index a6ee583032a..0d15d5ac497 100644
--- a/airflow-core/src/airflow/models/trigger.py
+++ b/airflow-core/src/airflow/models/trigger.py
@@ -24,7 +24,7 @@ from functools import singledispatch
 from traceback import format_exception
 from typing import TYPE_CHECKING, Any
 
-from sqlalchemy import Integer, String, Text, delete, func, or_, select, update
+from sqlalchemy import ForeignKey, Integer, String, Text, delete, func, or_, 
select, update
 from sqlalchemy.ext.associationproxy import association_proxy
 from sqlalchemy.orm import Mapped, Session, mapped_column, relationship, 
selectinload
 from sqlalchemy.sql.functions import coalesce
@@ -100,6 +100,15 @@ class Trigger(Base):
     triggerer_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
     queue: Mapped[str | None] = mapped_column(String(256), nullable=True)
 
+    # Denormalized from dag_bundle_team to keep the triggerer's ~1s polling 
queries join-free,
+    # especially since it's eventually consistent and trigger rows are 
ephemeral.
+    # Without this, filtering by team requires 2-3 joins depending on trigger 
type.
+    # Performance testing confirmed the denormalized column avoids measurable 
overhead in the
+    # triggerer loop under load.
+    team_name: Mapped[str | None] = mapped_column(
+        String(50), ForeignKey("team.name", ondelete="SET NULL"), 
nullable=True, index=True
+    )
+
     triggerer_job = relationship(
         "Job",
         primaryjoin="Job.id == Trigger.triggerer_id",
@@ -122,12 +131,14 @@ class Trigger(Base):
         kwargs: dict[str, Any],
         created_date: datetime.datetime | None = None,
         queue: str | None = None,
+        team_name: str | None = None,
     ) -> None:
         super().__init__()
         self.classpath = classpath
         self.encrypted_kwargs = self.encrypt_kwargs(kwargs)
         self.created_date = created_date or timezone.utcnow()
         self.queue = queue
+        self.team_name = team_name
 
     @property
     def kwargs(self) -> dict[str, Any]:
diff --git a/airflow-core/src/airflow/utils/db.py 
b/airflow-core/src/airflow/utils/db.py
index 9b747d7fc3f..00d512909dc 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -116,7 +116,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
     "3.1.0": "cc92b33c6709",
     "3.1.8": "509b94a1042d",
     "3.2.0": "1d6611b6ab7c",
-    "3.3.0": "a1b2c3d4e5f6",
+    "3.3.0": "acc215baed80",
 }
 
 # Prefix used to identify tables holding data moved during migration.
diff --git a/airflow-core/tests/unit/models/test_trigger.py 
b/airflow-core/tests/unit/models/test_trigger.py
index c79a021f8b8..5ee079f1f0f 100644
--- a/airflow-core/tests/unit/models/test_trigger.py
+++ b/airflow-core/tests/unit/models/test_trigger.py
@@ -83,6 +83,17 @@ def clear_db(session):
     session.commit()
 
 
+def test_trigger_team_name_stored(session, testing_team):
+    trigger = Trigger(
+        classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}, 
team_name=testing_team.name
+    )
+    session.add(trigger)
+    session.flush()
+
+    loaded = session.get(Trigger, trigger.id)
+    assert loaded.team_name == "testing"
+
+
 def test_fetch_trigger_ids_with_non_task_associations(session):
     # Create triggers
     asset_trigger = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger1", kwargs={})

Reply via email to