This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c7eaa64d92d Fix slow downgrade performance by adding index to
deadline.callback_id (#63612)
c7eaa64d92d is described below
commit c7eaa64d92d21b860ef7f85d1cdc0c9efdabb3b2
Author: Yashraj Chouhan <[email protected]>
AuthorDate: Wed Mar 18 20:13:30 2026 +0530
Fix slow downgrade performance by adding index to deadline.callback_id
(#63612)
* Fix slow downgrade performance by adding index to deadline.callback_id
This commit adds an index to the callback_id column on the deadline table.
It also modifies the 0094 migration script to create/drop this index during
upgrade/downgrade. To further optimize the downgrade path and avoid a full
table scan during batch deletions, the foreign key constraint is dropped
before deleting the data.
* Fix MySQL downgrade error by reordering drop operations
* Drop deadline_callback_id_idx after data migration, not before
---
.../0094_3_2_0_replace_deadline_inline_callback_with_fkey.py | 10 +++++++---
airflow-core/src/airflow/models/deadline.py | 5 ++++-
2 files changed, 11 insertions(+), 4 deletions(-)
diff --git
a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
index 245c8c0cc6f..8c9eddec2cb 100644
---
a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
+++
b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
@@ -187,6 +187,7 @@ def upgrade():
batch_op.alter_column("callback_id", existing_type=sa.Uuid(),
nullable=False)
batch_op.create_index("deadline_missed_deadline_time_idx", ["missed",
"deadline_time"], unique=False)
+ batch_op.create_index("deadline_callback_id_idx", ["callback_id"],
unique=False)
batch_op.drop_index(batch_op.f("deadline_callback_state_time_idx"))
batch_op.create_foreign_key(
batch_op.f("deadline_callback_id_fkey"), "callback",
["callback_id"], ["id"], ondelete="CASCADE"
@@ -199,7 +200,7 @@ def upgrade():
def downgrade():
"""Restore Deadline table's inline callback fields from callback_id
foreign key."""
- from airflow.models.callback import CallbackState
+ from airflow.utils.state import CallbackState
def migrate_batch(conn, deadline_table, callback_table, batch):
deadline_updates = []
@@ -320,17 +321,20 @@ def downgrade():
# Make callback_id nullable so the associated callbacks can be cleared
during migration
batch_op.alter_column("callback_id", existing_type=sa.Uuid(),
nullable=True)
+ batch_op.drop_constraint(batch_op.f("deadline_callback_id_fkey"),
type_="foreignkey")
+ # Note: deadline_callback_id_idx is kept here so it can speed up the
JOIN in migrate_all_data()
+
migrate_all_data()
with op.batch_alter_table("deadline") as batch_op:
# Data for `callback` has been migrated so make it non-nullable
batch_op.alter_column("callback", existing_type=sa.JSON(),
nullable=False)
-
- batch_op.drop_constraint(batch_op.f("deadline_callback_id_fkey"),
type_="foreignkey")
batch_op.create_foreign_key(batch_op.f("deadline_trigger_id_fkey"),
"trigger", ["trigger_id"], ["id"])
batch_op.drop_index("deadline_missed_deadline_time_idx")
batch_op.create_index(
batch_op.f("deadline_callback_state_time_idx"), ["callback_state",
"deadline_time"], unique=False
)
+ # Drop the index after data migration so it can speed up the JOIN
query in migrate_all_data()
+ batch_op.drop_index("deadline_callback_id_idx")
batch_op.drop_column("callback_id")
batch_op.drop_column("missed")
diff --git a/airflow-core/src/airflow/models/deadline.py
b/airflow-core/src/airflow/models/deadline.py
index ec3ab5ad99c..6a94b21223f 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -117,7 +117,10 @@ class Deadline(Base):
)
deadline_alert: Mapped[DeadlineAlert | None] =
relationship("DeadlineAlert")
- __table_args__ = (Index("deadline_missed_deadline_time_idx", missed,
deadline_time, unique=False),)
+ __table_args__ = (
+ Index("deadline_missed_deadline_time_idx", missed, deadline_time,
unique=False),
+ Index("deadline_callback_id_idx", callback_id, unique=False),
+ )
def __init__(
self,